ZIO Elasticsearch is a type-safe and streaming-friendly ZIO native Elasticsearch client.
The library depends on sttp as an HTTP client for executing requests, and other ZIO libraries such as ZIO Schema and ZIO Prelude.
The following versions are supported:
- Scala: 2.12, 2.13 and 3
- ZIO: 2
- Elasticsearch: 7
- JVM 11+
To use ZIO Elasticsearch in your project, add the following line to your build.sbt
file:
libraryDependencies += "io.lambdaworks" %% "zio-elasticsearch" % "0.2.0"
In order to execute an Elasticsearch request we can rely on the Elasticsearch
layer which offers an execute
method accepting an ElasticRequest
. In order to build the Elasticsearch
layer we need to provide the following layers:
ElasticExecutor
: if you provideElasticExecutor.local
, it will run onlocalhost:9200
. Otherwise, if you want to useElasticExecutor.live
, you must also provideElasticConfig
.HttpClientZioBackend
import sttp.client3.httpclient.zio.HttpClientZioBackend
import zio.elasticsearch._
import zio._
object ZIOElasticsearchExample extends ZIOAppDefault {
val indexName = IndexName("index")
val result: RIO[Elasticsearch, CreationOutcome] =
Elasticsearch.execute(ElasticRequest.createIndex(indexName))
override def run =
result.provide(
ElasticExecutor.local,
Elasticsearch.layer,
HttpClientZioBackend.layer()
)
}
The library uses ZIO Prelude's Newtype for IndexName
, DocumentId
and Routing
in order to preserve type-safety.
val indexName: IndexName = IndexName("index")
val documentId: DocumentId = DocumentId("documentId")
To provide type-safety in your Elasticsearch requests, ZIO Elasticsearch uses ZIO Schema. Here is an example of creating a schema for the custom type User
and using an implicit schema to create accessors that result in type-safe requests.
final case class Address(street: String, number: Int)
object Address {
implicit val schema: Schema.CaseClass2[String, Int, Address] =
DeriveSchema.gen[Address]
val (street, number) = schema.makeAccessors(FieldAccessorBuilder)
}
final case class User(id: Int, address: Address)
object User {
implicit val schema: Schema.CaseClass2[String, Address, User] =
DeriveSchema.gen[User]
val (id, address) = schema.makeAccessors(FieldAccessorBuilder)
}
val query: BoolQuery[User] =
ElasticQuery
.must(ElasticQuery.range(User.id).gte(7).lt(10))
.should(ElasticQuery.startsWith(User.address / Address.street, "ZIO"))
val aggregation: TermsAggregation =
ElasticAggregation
.termsAggregation("termsAgg", User.address / Address.street)
val request: SearchAndAggregateRequest =
ElasticRequest
.search(IndexName("index"), query)
.aggregate(aggregation)
val result: RIO[Elasticsearch, SearchResult] = Elasticsearch.execute(request)
ZIO Elastic requests and queries offer a fluent API, allowing us to provide optional parameters in chained method calls for each request or query.
For example, if we wanted to add routing and refresh parameters to a deleteById
request:
ElasticRequest.deleteById(IndexName("index"), DocumentId("documentId")).routing(Routing("routing")).refreshTrue
Creating complex queries can be created in the following manner:
ElasticQuery.must(ElasticQuery.range("version").gte(7).lt(10)).should(ElasticQuery.startsWith("name", "ZIO"))
If we want to specify lower and upper bounds for a range
query, we can do the following:
ElasticQuery.range(User.age).gte(18).lt(100)
ZIO Elastic requests like Create
, CreateOrUpdate
, CreateWithId
, and DeleteById
are bulkable requests.
For bulkable requests, you can use bulk
API that accepts request types that inherit the Bulkable
trait.
ElasticRequest.bulk(
ElasticRequest.create[User](indexName, User(1, "John Doe")),
ElasticRequest.create[User](indexName, DocumentId("documentId2"), User(2, "Jane Doe")),
ElasticRequest.upsert[User](indexName, DocumentId("documentId3"), User(3, "Richard Roe")),
ElasticRequest.deleteById(indexName, DocumentId("documentId2"))
)
ZIO Elasticsearch is a streaming-friendly library, and it provides specific APIs for creating ZIO streams. When using the stream API, the result will be an Item
, which is a case class that contains only one field, raw
, that represents your response as raw JSON. Additionally, it is important to note that you can use StreamConfig
to customize your settings when creating a stream. If you don't use StreamConfig
, the default settings (StreamConfig.Default
) will be used.
val request: SearchRequest =
ElasticRequest.search(IndexName("index"), ElasticQuery.range(User.id).gte(5))
val defaultStream: ZStream[Elasticsearch, Throwable, Item] =
Elasticsearch.stream(request)
val scrollStream: ZStream[Elasticsearch, Throwable, Item] =
Elasticsearch.stream(request, StreamConfig.Scroll)
val searchAfterStream: ZStream[Elasticsearch, Throwable, User] =
Elasticsearch.streamAs[User](request, StreamConfig.SearchAfter)
For a full-fledged example using this library, you can check out the example module, which contains an application with both a description and instructions on how to run it.
Learn more on the ZIO Elasticsearch homepage!
For the general guidelines, see ZIO Elasticsearch contributor's guide.
See the Code of Conduct.