Skip to content

Commit

Permalink
Expose execute through Elasticsearch trait
Browse files Browse the repository at this point in the history
  • Loading branch information
mvelimir committed Feb 22, 2023
1 parent 5ff9c38 commit e21e5b6
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 169 deletions.
17 changes: 9 additions & 8 deletions modules/example/src/main/scala/example/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import sttp.client3.SttpBackend
import sttp.client3.httpclient.zio.HttpClientZioBackend
import zio._
import zio.config.getConfig
import zio.elasticsearch.{ElasticConfig, ElasticExecutor, ElasticRequest}
import zio.elasticsearch.{ElasticConfig, ElasticExecutor, ElasticRequest, Elasticsearch}
import zio.http.{Server, ServerConfig}

import scala.io.Source
Expand All @@ -38,25 +38,26 @@ object Main extends ZIOAppDefault {
AppConfig.live,
elasticConfigLive,
ElasticExecutor.live,
Elasticsearch.layer,
HttpClientZioBackend.layer()
)
}

private[this] def prepare: RIO[SttpBackend[Task, Any] with ElasticExecutor, Unit] = {
val deleteIndex: RIO[ElasticExecutor, Unit] =
private[this] def prepare: RIO[SttpBackend[Task, Any] with Elasticsearch, Unit] = {
val deleteIndex: RIO[Elasticsearch, Unit] =
for {
_ <- ZIO.logInfo(s"Deleting index '$Index'...")
_ <- ElasticRequest.deleteIndex(Index).execute
_ <- Elasticsearch.execute(ElasticRequest.deleteIndex(Index))
} yield ()

val createIndex: RIO[ElasticExecutor, Unit] =
val createIndex: RIO[Elasticsearch, Unit] =
for {
_ <- ZIO.logInfo(s"Creating index '$Index'...")
mapping <- ZIO.fromTry(Using(Source.fromURL(getClass.getResource("/mapping.json")))(_.mkString))
_ <- ElasticRequest.createIndex(Index, Some(mapping)).execute
_ <- Elasticsearch.execute(ElasticRequest.createIndex(Index, Some(mapping)))
} yield ()

val populate: RIO[SttpBackend[Task, Any] with ElasticExecutor, Unit] =
val populate: RIO[SttpBackend[Task, Any] with Elasticsearch, Unit] =
(for {
repositories <- RepoFetcher.fetchAllByOrganization(organization)
_ <- ZIO.logInfo("Adding GitHub repositories...")
Expand All @@ -66,7 +67,7 @@ object Main extends ZIOAppDefault {
deleteIndex *> createIndex *> populate
}

private[this] def runServer: RIO[HttpConfig with ElasticExecutor, ExitCode] = {
private[this] def runServer: RIO[HttpConfig with Elasticsearch, ExitCode] = {
val serverConfigLive = ZLayer.fromFunction((http: HttpConfig) => ServerConfig.default.port(http.port))

(for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,30 @@ import zio.elasticsearch.{
CreationOutcome,
DeletionOutcome,
DocumentId,
ElasticExecutor,
ElasticQuery,
ElasticRequest,
Elasticsearch,
Routing
}
import zio.prelude.Newtype.unsafeWrap

final case class RepositoriesElasticsearch(executor: ElasticExecutor) {
final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) {

def findAll(): Task[List[GitHubRepo]] =
executor.execute(ElasticRequest.search[GitHubRepo](Index, matchAll))
elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, matchAll))

def findById(organization: String, id: String): Task[Option[GitHubRepo]] =
for {
routing <- routingOf(organization)
req = ElasticRequest.getById[GitHubRepo](Index, DocumentId(id)).routing(routing)
res <- executor.execute(req)
res <- elasticsearch.execute(req)
} yield res

def create(repository: GitHubRepo): Task[CreationOutcome] =
for {
routing <- routingOf(repository.organization)
req = ElasticRequest.create(Index, DocumentId(repository.id), repository).routing(routing).refreshTrue
res <- executor.execute(req)
res <- elasticsearch.execute(req)
} yield res

def createAll(repositories: List[GitHubRepo]): Task[Unit] =
Expand All @@ -55,25 +55,25 @@ final case class RepositoriesElasticsearch(executor: ElasticExecutor) {
ElasticRequest.create[GitHubRepo](Index, unsafeWrap(DocumentId)(repository.id), repository)
}
bulkReq = ElasticRequest.bulk(reqs: _*).routing(routing)
_ <- executor.execute(bulkReq)
_ <- elasticsearch.execute(bulkReq)
} yield ()

def upsert(id: String, repository: GitHubRepo): Task[Unit] =
for {
routing <- routingOf(repository.organization)
req = ElasticRequest.upsert(Index, DocumentId(id), repository).routing(routing).refresh(value = true)
_ <- executor.execute(req)
_ <- elasticsearch.execute(req)
} yield ()

def remove(organization: String, id: String): Task[DeletionOutcome] =
for {
routing <- routingOf(organization)
req = ElasticRequest.deleteById(Index, DocumentId(id)).routing(routing).refreshFalse
res <- executor.execute(req)
res <- elasticsearch.execute(req)
} yield res

def search(query: ElasticQuery[_]): Task[List[GitHubRepo]] =
executor.execute(ElasticRequest.search[GitHubRepo](Index, query))
elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, query))

private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] =
Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e))
Expand Down Expand Up @@ -103,6 +103,6 @@ object RepositoriesElasticsearch {
def search(query: ElasticQuery[_]): RIO[RepositoriesElasticsearch, List[GitHubRepo]] =
ZIO.serviceWithZIO[RepositoriesElasticsearch](_.search(query))

lazy val live: URLayer[ElasticExecutor, RepositoriesElasticsearch] =
lazy val live: URLayer[Elasticsearch, RepositoriesElasticsearch] =
ZLayer.fromFunction(RepositoriesElasticsearch(_))
}
Loading

0 comments on commit e21e5b6

Please sign in to comment.