Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Map and phantom type parameter ERT and support streaming #99

Merged
merged 14 commits into from
Mar 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import zio.prelude.Newtype.unsafeWrap
final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) {

def findAll(): Task[List[GitHubRepo]] =
elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, matchAll))
elasticsearch.execute(ElasticRequest.search(Index, matchAll)).documentAs[GitHubRepo]

def findById(organization: String, id: String): Task[Option[GitHubRepo]] =
for {
routing <- routingOf(organization)
res <- elasticsearch.execute(ElasticRequest.getById[GitHubRepo](Index, DocumentId(id)).routing(routing))
res <-
elasticsearch.execute(ElasticRequest.getById(Index, DocumentId(id)).routing(routing)).documentAs[GitHubRepo]
} yield res

def create(repository: GitHubRepo): Task[CreationOutcome] =
Expand Down Expand Up @@ -75,7 +76,7 @@ final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) {
} yield res

def search(query: ElasticQuery[_]): Task[List[GitHubRepo]] =
elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, query))
elasticsearch.execute(ElasticRequest.search(Index, query)).documentAs[GitHubRepo]

private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] =
Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e))
Expand Down
121 changes: 109 additions & 12 deletions modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package zio.elasticsearch

import zio.Chunk
import zio.elasticsearch.ElasticQuery._
import zio.stream.{Sink, ZSink}
import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import scala.util.Random

object HttpExecutorSpec extends IntegrationSpec {

def spec: Spec[TestEnvironment, Any] = {
Expand All @@ -31,7 +35,7 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genCustomer) { customer =>
for {
docId <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, customer))
res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, docId))
res <- ElasticExecutor.execute(ElasticRequest.getById(index, docId)).documentAs[CustomerDocument]
} yield assert(res)(isSome(equalTo(customer)))
}
},
Expand Down Expand Up @@ -67,7 +71,7 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genDocumentId, genCustomer) { (documentId, customer) =>
for {
_ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer))
doc <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId))
doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield assert(doc)(isSome(equalTo(customer)))
}
},
Expand All @@ -76,7 +80,7 @@ object HttpExecutorSpec extends IntegrationSpec {
for {
_ <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer))
_ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer))
doc <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId))
doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield assert(doc)(isSome(equalTo(secondCustomer)))
}
}
Expand Down Expand Up @@ -131,20 +135,24 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genDocumentId, genCustomer) { (documentId, customer) =>
for {
_ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer))
res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId))
res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield assert(res)(isSome(equalTo(customer)))
}
},
test("return None if the document does not exist") {
checkOnce(genDocumentId) { documentId =>
assertZIO(ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)))(isNone)
assertZIO(
ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
)(
isNone
)
}
},
test("fail with throwable if decoding fails") {
checkOnce(genDocumentId, genEmployee) { (documentId, employee) =>
val result = for {
_ <- ElasticExecutor.execute(ElasticRequest.upsert[EmployeeDocument](index, documentId, employee))
res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId))
res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
} yield res

assertZIO(result.exit)(
Expand All @@ -170,7 +178,8 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
query = range("balance").gte(100)
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(isNonEmpty)
}
} @@ around(
Expand All @@ -193,7 +202,10 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
query = range("age").gte(0)
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](secondSearchIndex, query))
res <-
ElasticExecutor
.execute(ElasticRequest.search(secondSearchIndex, query))
.documentAs[CustomerDocument]
} yield res

assertZIO(result.exit)(
Expand Down Expand Up @@ -224,7 +236,8 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
query = ElasticQuery.contains("name.keyword", firstCustomer.name.take(3))
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(Assertion.contains(firstCustomer))
}
} @@ around(
Expand All @@ -247,7 +260,8 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
query = ElasticQuery.startsWith("name.keyword", firstCustomer.name.take(3))
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(Assertion.contains(firstCustomer))
}
} @@ around(
Expand All @@ -271,14 +285,95 @@ object HttpExecutorSpec extends IntegrationSpec {
)
query =
wildcard("name.keyword", s"${firstCustomer.name.take(2)}*${firstCustomer.name.takeRight(2)}")
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query))
res <-
ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument]
} yield assert(res)(Assertion.contains(firstCustomer))
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
)
) @@ shrinks(0),
suite("searching documents and returning them as a stream")(
test("search for documents using range query") {
checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) {
(firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) =>
val sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <-
ElasticExecutor.execute(
ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer)
)
_ <-
ElasticExecutor.execute(
ElasticRequest
.upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer)
.refreshTrue
)
query = range("balance").gte(100)
res <-
ElasticExecutor.stream(ElasticRequest.search(firstSearchIndex, query)).run(sink)
} yield assert(res)(isNonEmpty)
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("search for documents using range query with multiple pages") {
checkOnce(genCustomer) { customer =>
def sink: Sink[Throwable, Item, Nothing, Chunk[Item]] =
ZSink.collectAll[Item]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll))
reqs = (0 to 203).map { _ =>
ElasticRequest.create[CustomerDocument](
secondSearchIndex,
customer.copy(id = Random.alphanumeric.take(5).mkString, balance = 150)
)
}
_ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue)
query = range("balance").gte(100)
res <- ElasticExecutor
.stream(
ElasticRequest.search(secondSearchIndex, query)
)
.run(sink)
} yield assert(res)(hasSize(equalTo(204)))
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie
),
test("search for documents using range query with multiple pages and return type") {
dbulaja98 marked this conversation as resolved.
Show resolved Hide resolved
checkOnce(genCustomer) { customer =>
def sink: Sink[Throwable, CustomerDocument, Nothing, Chunk[CustomerDocument]] =
ZSink.collectAll[CustomerDocument]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll))
reqs = (0 to 200).map { _ =>
ElasticRequest.create[CustomerDocument](
secondSearchIndex,
customer.copy(id = Random.alphanumeric.take(5).mkString, balance = 150)
)
}
_ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue)
query = range("balance").gte(100)
res <- ElasticExecutor
.streamAs[CustomerDocument](
ElasticRequest.search(secondSearchIndex, query)
)
.run(sink)
} yield assert(res)(hasSize(equalTo(201)))
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie
)
) @@ shrinks(0),
suite("deleting by query")(
test("successfully delete all matched documents") {
checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer, genDocumentId, genCustomer) {
Expand Down Expand Up @@ -314,7 +409,9 @@ object HttpExecutorSpec extends IntegrationSpec {
deleteQuery = range("balance").gte(300)
_ <-
ElasticExecutor.execute(ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue)
res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](deleteByQueryIndex, matchAll))
res <- ElasticExecutor
.execute(ElasticRequest.search(deleteByQueryIndex, matchAll))
.documentAs[CustomerDocument]
} yield assert(res)(hasSameElements(List(firstCustomer.copy(balance = 150))))
}
} @@ around(
Expand Down
10 changes: 2 additions & 8 deletions modules/library/src/main/scala/zio/elasticsearch/Document.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@

package zio.elasticsearch

import zio.json.ast.Json
import zio.schema.Schema
import zio.schema.codec.JsonCodec.JsonDecoder
import zio.schema.codec.{DecodeError, JsonCodec}
import zio.schema.codec.JsonCodec

private[elasticsearch] final case class Document(json: String) {
def decode[A](implicit schema: Schema[A]): Either[DecodeError, A] = JsonDecoder.decode(schema, json)
}
private[elasticsearch] final case class Document(json: String)

private[elasticsearch] object Document {
def from[A](doc: A)(implicit schema: Schema[A]): Document = Document(
JsonCodec.jsonEncoder(schema).encodeJson(doc, indent = None).toString
)

def from(json: Json): Document = new Document(json.toString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
package zio.elasticsearch

import sttp.client3.SttpBackend
import zio.elasticsearch.ElasticRequest.GetByQuery
import zio.schema.Schema
import zio.stream.{Stream, ZStream}
import zio.{RIO, Task, URLayer, ZIO, ZLayer}

private[elasticsearch] trait ElasticExecutor {
def execute[A](request: ElasticRequest[A, _]): Task[A]
def execute[A](request: ElasticRequest[A]): Task[A]

def stream(request: GetByQuery): Stream[Throwable, Item]

def streamAs[A: Schema](request: GetByQuery): Stream[Throwable, A]
}

object ElasticExecutor {
Expand All @@ -30,6 +37,12 @@ object ElasticExecutor {
lazy val local: URLayer[SttpBackend[Task, Any], ElasticExecutor] =
ZLayer.succeed(ElasticConfig.Default) >>> live

private[elasticsearch] def execute[A](request: ElasticRequest[A, _]): RIO[ElasticExecutor, A] =
private[elasticsearch] def execute[A](request: ElasticRequest[A]): RIO[ElasticExecutor, A] =
ZIO.serviceWithZIO[ElasticExecutor](_.execute(request))

private[elasticsearch] def stream(request: GetByQuery): ZStream[ElasticExecutor, Throwable, Item] =
ZStream.serviceWithStream[ElasticExecutor](_.stream(request))

private[elasticsearch] def streamAs[A: Schema](request: GetByQuery): ZStream[ElasticExecutor, Throwable, A] =
ZStream.serviceWithStream[ElasticExecutor](_.streamAs[A](request))
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import zio.json.ast.Json
import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField}

private[elasticsearch] final case class ElasticQueryResponse(
@jsonField("_scroll_id")
scrollId: Option[String],
took: Int,
@jsonField("timed_out")
timedOut: Boolean,
Expand Down Expand Up @@ -50,7 +52,7 @@ private[elasticsearch] final case class Hits(
total: Total,
@jsonField("max_score")
maxScore: Option[Double] = None,
hits: List[Item]
hits: List[Hit]
)

private[elasticsearch] object Hits {
Expand All @@ -63,7 +65,7 @@ private[elasticsearch] object Total {
implicit val decoder: JsonDecoder[Total] = DeriveJsonDecoder.gen[Total]
}

private[elasticsearch] final case class Item(
private[elasticsearch] final case class Hit(
@jsonField("_index")
index: String,
@jsonField("_type")
Expand All @@ -76,6 +78,6 @@ private[elasticsearch] final case class Item(
source: Json
)

private[elasticsearch] object Item {
implicit val decoder: JsonDecoder[Item] = DeriveJsonDecoder.gen[Item]
private[elasticsearch] object Hit {
implicit val decoder: JsonDecoder[Hit] = DeriveJsonDecoder.gen[Hit]
}
Loading