Skip to content

Commit

Permalink
Add logs to HttpElasticExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
dbulaja98 committed Dec 6, 2022
1 parent 5221b76 commit 4dae96c
Showing 1 changed file with 43 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import sttp.model.MediaType.ApplicationJson
import sttp.model.StatusCode.Ok
import sttp.model.Uri
import zio.Task
import zio.ZIO.{logError, logInfo}
import zio.elasticsearch.ElasticRequest._

private[elasticsearch] final class HttpElasticExecutor private (config: ElasticConfig, client: SttpBackend[Task, Any])
Expand All @@ -29,12 +30,16 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC

private def executeGetById(r: GetById): Task[Option[Document]] = {
val uri = uri"$basePath/${r.index}/$Doc/${r.id}".withParam("routing", r.routing.map(Routing.unwrap))
request
.get(uri)
.response(asJson[ElasticGetResponse])
.send(client)
.map(_.body.toOption)
.map(_.flatMap(d => if (d.found) Some(Document.from(d.source)) else None))

for {
_ <- logInfo(s"Executing get document by id: ${r.id}...")
maybeDocument <- request
.get(uri)
.response(asJson[ElasticGetResponse])
.send(client)
.map(_.body.toOption)
.map(_.flatMap(d => if (d.found) Some(Document.from(d.source)) else None))
} yield maybeDocument
}

private def executeCreate(r: Create): Task[Option[DocumentId]] = {
Expand All @@ -45,27 +50,40 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC
uri"$basePath/${r.index}/$Doc".withParam("routing", r.routing.map(Routing.unwrap))
}

request
.post(uri)
.contentType(ApplicationJson)
.response(asJson[ElasticCreateResponse])
.body(r.document.json)
.send(client)
.map(_.body.toOption)
.map(_.flatMap(body => DocumentId.make(body.id).toOption))
// for now, it is still "happy path", I don't know if there are any other ways for failure
for {
_ <- logInfo("Executing create document...")
maybeDocumentId <- request
.post(uri)
.contentType(ApplicationJson)
.response(asJson[ElasticCreateResponse])
.body(r.document.json)
.send(client)
.map(_.body.toOption)
.map(_.flatMap(body => DocumentId.make(body.id).toOption))
_ <- if (maybeDocumentId.isEmpty)
logError("Document could not be created - document with given id already exists.")
else logInfo("Document is successfully created.")
} yield maybeDocumentId
}

private def executeCreateIndex(createIndex: CreateIndex): Task[Unit] =
request
.put(uri"$basePath/${createIndex.name}")
.contentType(ApplicationJson)
.body(createIndex.definition.getOrElse(""))
.send(client)
.unit
for {
_ <- logInfo("Executing create index...")
_ <- request
.put(uri"$basePath/${createIndex.name}")
.contentType(ApplicationJson)
.body(createIndex.definition.getOrElse(""))
.send(client)
} yield ()

private def executeCreateOrUpdate(r: CreateOrUpdate): Task[Unit] = {
val uri = uri"$basePath/${r.index}/$Doc/${r.id}".withParam("routing", r.routing.map(Routing.unwrap))
request.put(uri).contentType(ApplicationJson).body(r.document.json).send(client).unit

for {
_ <- logInfo("Executing create or update document...")
_ <- request.put(uri).contentType(ApplicationJson).body(r.document.json).send(client)
} yield ()
}

private def executeExists(r: Exists): Task[Boolean] = {
Expand All @@ -74,7 +92,10 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC
}

private def executeDeleteIndex(r: DeleteIndex): Task[Unit] =
request.delete(uri"$basePath/${r.name}").send(client).unit
for {
_ <- logInfo("Executing delete index...")
_ <- request.delete(uri"$basePath/${r.name}").send(client)
} yield ()

private def executeDeleteById(r: DeleteById): Task[Option[Unit]] = {
val uri = uri"$basePath/${r.index}/$Doc/${r.id}".withParam("routing", r.routing.map(Routing.unwrap))
Expand Down

0 comments on commit 4dae96c

Please sign in to comment.