Skip to content

Commit

Permalink
Fix code remarks and add typed stream
Browse files Browse the repository at this point in the history
  • Loading branch information
markaya committed Mar 3, 2023
1 parent 7fe6d4e commit 5bdae16
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import zio.prelude.Newtype.unsafeWrap
final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) {

def findAll(): Task[List[GitHubRepo]] =
elasticsearch.execute(ElasticRequest.search(Index, matchAll)).result[GitHubRepo]
elasticsearch.execute(ElasticRequest.search(Index, matchAll)).documentAs[GitHubRepo]

def findById(organization: String, id: String): Task[Option[GitHubRepo]] =
for {
routing <- routingOf(organization)
res <- elasticsearch.execute(ElasticRequest.getById(Index, DocumentId(id)).routing(routing)).result[GitHubRepo]
res <-
elasticsearch.execute(ElasticRequest.getById(Index, DocumentId(id)).routing(routing)).documentAs[GitHubRepo]
} yield res

def create(repository: GitHubRepo): Task[CreationOutcome] =
Expand Down Expand Up @@ -75,7 +76,7 @@ final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) {
} yield res

def search(query: ElasticQuery[_]): Task[List[GitHubRepo]] =
elasticsearch.execute(ElasticRequest.search(Index, query)).result[GitHubRepo]
elasticsearch.execute(ElasticRequest.search(Index, query)).documentAs[GitHubRepo]

private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] =
Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import zio.Chunk
import zio.elasticsearch.ElasticQuery._
import zio.schema.Schema
import zio.schema.codec.DecodeError
import zio.stream.{ZPipeline, ZSink}
import zio.stream.{Sink, ZPipeline, ZSink}
import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._
Expand All @@ -37,7 +37,7 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genCustomer) { customer =>
for {
docId <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, customer))
res <- ElasticExecutor.execute(ElasticRequest.getById(index, docId)).result[CustomerDocument]
res <- ElasticExecutor.execute(ElasticRequest.getById(index, docId)).documentAs[CustomerDocument]
} yield assert(res)(isSome(equalTo(customer)))
}
},
Expand Down Expand Up @@ -73,7 +73,7 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genDocumentId, genCustomer) { (documentId, customer) =>
for {
_ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer))
doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument]
doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield assert(doc)(isSome(equalTo(customer)))
}
},
Expand All @@ -82,7 +82,7 @@ object HttpExecutorSpec extends IntegrationSpec {
for {
_ <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer))
_ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer))
doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument]
doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield assert(doc)(isSome(equalTo(secondCustomer)))
}
}
Expand Down Expand Up @@ -137,13 +137,15 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genDocumentId, genCustomer) { (documentId, customer) =>
for {
_ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer))
res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument]
res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield assert(res)(isSome(equalTo(customer)))
}
},
test("return None if the document does not exist") {
checkOnce(genDocumentId) { documentId =>
assertZIO(ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument])(
assertZIO(
ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
)(
isNone
)
}
Expand All @@ -152,7 +154,7 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genDocumentId, genEmployee) { (documentId, employee) =>
val result = for {
_ <- ElasticExecutor.execute(ElasticRequest.upsert[EmployeeDocument](index, documentId, employee))
res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).result[CustomerDocument]
res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield res

assertZIO(result.exit)(
Expand All @@ -179,7 +181,7 @@ object HttpExecutorSpec extends IntegrationSpec {
)
query = range("balance").gte(100)
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).result[CustomerDocument]
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(isNonEmpty)
}
} @@ around(
Expand All @@ -203,7 +205,9 @@ object HttpExecutorSpec extends IntegrationSpec {
)
query = range("age").gte(0)
res <-
ElasticExecutor.execute(ElasticRequest.search(secondSearchIndex, query)).result[CustomerDocument]
ElasticExecutor
.execute(ElasticRequest.search(secondSearchIndex, query))
.documentAs[CustomerDocument]
} yield res

assertZIO(result.exit)(
Expand Down Expand Up @@ -235,7 +239,7 @@ object HttpExecutorSpec extends IntegrationSpec {
)
query = ElasticQuery.contains("name.keyword", firstCustomer.name.take(3))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).result[CustomerDocument]
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(Assertion.contains(firstCustomer))
}
} @@ around(
Expand All @@ -259,7 +263,7 @@ object HttpExecutorSpec extends IntegrationSpec {
)
query = ElasticQuery.startsWith("name.keyword", firstCustomer.name.take(3))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).result[CustomerDocument]
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(Assertion.contains(firstCustomer))
}
} @@ around(
Expand All @@ -284,19 +288,19 @@ object HttpExecutorSpec extends IntegrationSpec {
query =
wildcard("name.keyword", s"${firstCustomer.name.take(2)}*${firstCustomer.name.takeRight(2)}")
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).result[CustomerDocument]
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(Assertion.contains(firstCustomer))
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex, None)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
)
) @@ shrinks(0),
suite("searching documents and returning them as ZStream")(
test("search for document using range query") {
suite("searching documents and returning them as a stream")(
test("search for documents using range query") {
checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) {
(firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) =>
val sink: ZSink[Any, Throwable, RawItem, Nothing, Chunk[RawItem]] = ZSink.collectAll[RawItem]
val sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
Expand All @@ -319,12 +323,12 @@ object HttpExecutorSpec extends IntegrationSpec {
ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex, None)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("search for document using range query with multiple pages") {
test("search for documents using range query with multiple pages") {
checkOnce(genCustomer) { customer =>
def sink: ZSink[Any, Throwable, CustomerDocument, Nothing, Chunk[CustomerDocument]] =
def sink: Sink[Throwable, CustomerDocument, Nothing, Chunk[CustomerDocument]] =
ZSink.collectAll[CustomerDocument]

def pipeline[A: Schema]: ZPipeline[Any, Nothing, RawItem, Either[DecodeError, A]] =
def pipeline[A: Schema]: ZPipeline[Any, Nothing, Item, Either[DecodeError, A]] =
ZPipeline.map(_.documentAs[A])
def pipeline_2[A]: ZPipeline[Any, Nothing, Either[DecodeError, A], A] = ZPipeline.collectWhileRight

Expand All @@ -349,6 +353,35 @@ object HttpExecutorSpec extends IntegrationSpec {
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex, None)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie
),
test("search for documents using range query with multiple pages and return type") {
checkOnce(genCustomer) { customer =>
def sink: Sink[Throwable, CustomerDocument, Nothing, Chunk[CustomerDocument]] =
ZSink.collectAll[CustomerDocument]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll))
reqs = (1 to 50).map { _ =>
ElasticRequest.create[CustomerDocument](
secondSearchIndex,
customer.copy(id = Random.alphanumeric.take(5).mkString, balance = 150)
)
}
_ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*))
_ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*))
_ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*))
_ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue)
query = range("balance").gte(100)
res <- ElasticExecutor
.streamAs[CustomerDocument](
ElasticRequest.search(secondSearchIndex, query)
)
.run(sink)
} yield assert(res)(hasSize(Assertion.equalTo(200)))
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex, None)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie
)
) @@ shrinks(0),
suite("deleting by query")(
Expand Down Expand Up @@ -388,7 +421,7 @@ object HttpExecutorSpec extends IntegrationSpec {
ElasticExecutor.execute(ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue)
res <- ElasticExecutor
.execute(ElasticRequest.search(deleteByQueryIndex, matchAll))
.result[CustomerDocument]
.documentAs[CustomerDocument]
} yield assert(res)(hasSameElements(List(firstCustomer.copy(balance = 150))))
}
} @@ around(
Expand Down
10 changes: 2 additions & 8 deletions modules/library/src/main/scala/zio/elasticsearch/Document.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@

package zio.elasticsearch

import zio.json.ast.Json
import zio.schema.Schema
import zio.schema.codec.JsonCodec.JsonDecoder
import zio.schema.codec.{DecodeError, JsonCodec}
import zio.schema.codec.JsonCodec

private[elasticsearch] final case class Document(json: String) {
def decode[A](implicit schema: Schema[A]): Either[DecodeError, A] = JsonDecoder.decode(schema, json)
}
private[elasticsearch] final case class Document(json: String)

private[elasticsearch] object Document {
def from[A](doc: A)(implicit schema: Schema[A]): Document = Document(
JsonCodec.jsonEncoder(schema).encodeJson(doc, indent = None).toString
)

def from(json: Json): Document = new Document(json.toString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package zio.elasticsearch
import sttp.client3.SttpBackend
import zio.elasticsearch.ElasticRequest.GetByQuery
import zio.stream.ZStream
import zio.schema.Schema
import zio.{RIO, Task, URLayer, ZIO, ZLayer}

private[elasticsearch] trait ElasticExecutor {
def execute[A](request: ElasticRequest[A]): Task[A]

def stream(request: GetByQuery): ZStream[Any, Throwable, RawItem]
def stream(request: GetByQuery): ZStream[Any, Throwable, Item]

def streamAs[A: Schema](request: GetByQuery): ZStream[Any, Throwable, A]
}

object ElasticExecutor {
Expand All @@ -37,6 +40,9 @@ object ElasticExecutor {
private[elasticsearch] def execute[A](request: ElasticRequest[A]): RIO[ElasticExecutor, A] =
ZIO.serviceWithZIO[ElasticExecutor](_.execute(request))

private[elasticsearch] def stream(request: GetByQuery): ZStream[ElasticExecutor, Throwable, RawItem] =
private[elasticsearch] def stream(request: GetByQuery): ZStream[ElasticExecutor, Throwable, Item] =
ZStream.serviceWithStream[ElasticExecutor](_.stream(request))

private[elasticsearch] def streamAs[A: Schema](request: GetByQuery): ZStream[ElasticExecutor, Throwable, A] =
ZStream.serviceWithStream[ElasticExecutor](_.streamAs[A](request))
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[elasticsearch] final case class Hits(
total: Total,
@jsonField("max_score")
maxScore: Option[Double] = None,
hits: List[Item]
hits: List[Hit]
)

private[elasticsearch] object Hits {
Expand All @@ -65,7 +65,7 @@ private[elasticsearch] object Total {
implicit val decoder: JsonDecoder[Total] = DeriveJsonDecoder.gen[Total]
}

private[elasticsearch] final case class Item(
private[elasticsearch] final case class Hit(
@jsonField("_index")
index: String,
@jsonField("_type")
Expand All @@ -78,6 +78,6 @@ private[elasticsearch] final case class Item(
source: Json
)

private[elasticsearch] object Item {
implicit val decoder: JsonDecoder[Item] = DeriveJsonDecoder.gen[Item]
private[elasticsearch] object Hit {
implicit val decoder: JsonDecoder[Hit] = DeriveJsonDecoder.gen[Hit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ package zio.elasticsearch

import zio.prelude.ZValidation
import zio.schema.Schema
import zio.{Task, ZIO}
import zio.{IO, Task, ZIO}

sealed trait ElasticResult[F[_]] {
def result[A: Schema]: Task[F[A]]
def documentAs[A: Schema]: Task[F[A]]
}

final class GetResult private[elasticsearch] (private val doc: Option[Document]) extends ElasticResult[Option] {
override def result[A: Schema]: Task[Option[A]] =
final class GetResult private[elasticsearch] (private val doc: Option[Item]) extends ElasticResult[Option] {
override def documentAs[A: Schema]: IO[DecodingException, Option[A]] =
ZIO
.fromEither(doc match {
case Some(document) =>
document.decode match {
case Some(item) =>
item.documentAs match {
case Left(e) => Left(DecodingException(s"Could not parse the document: ${e.message}"))
case Right(doc) => Right(Some(doc))
}
Expand All @@ -39,10 +39,10 @@ final class GetResult private[elasticsearch] (private val doc: Option[Document])
.mapError(e => DecodingException(s"Could not parse the document: ${e.message}"))
}

final class SearchResult private[elasticsearch] (private val hits: List[Document]) extends ElasticResult[List] {
override def result[A: Schema]: Task[List[A]] =
final class SearchResult private[elasticsearch] (private val hits: List[Item]) extends ElasticResult[List] {
override def documentAs[A: Schema]: IO[DecodingException, List[A]] =
ZIO.fromEither {
ZValidation.validateAll(hits.map(d => ZValidation.fromEither(d.decode))).toEitherWith { errors =>
ZValidation.validateAll(hits.map(item => ZValidation.fromEither(item.documentAs))).toEitherWith { errors =>
DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})")
}
}
Expand Down
Loading

0 comments on commit 5bdae16

Please sign in to comment.