Skip to content

Commit

Permalink
Follow Open Telemetry Semantic Conventions
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Nov 1, 2024
1 parent 1c1aa00 commit b31e25f
Showing 1 changed file with 43 additions and 41 deletions.
84 changes: 43 additions & 41 deletions modules/elastic/src/main/scala/ESClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,37 @@ trait ESClient[F[_]]:

object ESClient:

object MetricKeys:
val dbCollectionName = AttributeKey.string("db.collection.name")
val dbBatchSize = AttributeKey.long("db.operation.batch.size")
val dbOperationName = AttributeKey.string("db.operation.name")
val errorType = AttributeKey.string("error.type")

import lila.search.ESClient.MetricKeys.*
private def withErrorType(static: Attributes)(ec: Resource.ExitCase): Attributes = ec match
case Resource.ExitCase.Succeeded =>
static
case Resource.ExitCase.Errored(e) =>
static.added(Attribute(errorType, e.getClass.getName))
case Resource.ExitCase.Canceled =>
static.added(Attribute(errorType, "canceled"))

def apply(uri: String)(using meter: Meter[IO]): Resource[IO, ESClient[IO]] =
Resource
.make(IO(ElasticClient(JavaClient(ElasticProperties(uri)))))(client => IO(client.close()))
.evalMap: esClient =>
meter
.histogram[Double]("client.duration")
.histogram[Double]("db.client.operation.duration")
.withUnit("ms")
.create
.map(apply(esClient))
.map(
apply(
esClient,
Attributes(Attribute("db.system", "elasticsearch"), Attribute("server.address", uri))
)
)

def apply[F[_]: MonadCancelThrow: Functor: Executor](client: ElasticClient)(
def apply[F[_]: MonadCancelThrow: Functor: Executor](client: ElasticClient, baseAttributes: Attributes)(
metric: Histogram[F, Double]
) = new ESClient[F]:

Expand All @@ -55,33 +75,20 @@ object ESClient:
.flatMap(toResult)
.map(_.status)

private def withErrorType(static: Attributes)(ec: Resource.ExitCase) = ec match
case Resource.ExitCase.Succeeded =>
static
case Resource.ExitCase.Errored(e) =>
static.added(Attribute("error.type", e.getClass.getName))
case Resource.ExitCase.Canceled =>
static.added(Attribute("error.type", "canceled"))

private def toResult[A](response: Response[A]): F[A] =
response.fold(MonadThrow[F].raiseError[A](response.error.asException))(MonadThrow[F].pure)

private def unitOrFail[A](response: Response[A]): F[Unit] =
response.fold(MonadThrow[F].raiseError[Unit](response.error.asException))(_ => MonadThrow[F].unit)

val indexAttributeKey = AttributeKey.string("index")
val sizeAttributeKey = AttributeKey.long("size")
val opAttributeKey = AttributeKey.string("name")

def search[A](query: A, from: From, size: Size)(using q: Queryable[A]): F[List[Id]] =
metric
.recordDuration(
TimeUnit.MILLISECONDS,
withErrorType(
Attributes(
Attribute(opAttributeKey, "search"),
Attribute(indexAttributeKey, q.index(query).value)
)
baseAttributes
.added(Attribute(dbOperationName, "search"))
.added(Attribute(dbCollectionName, q.index(query).value))
)
)
.surround:
Expand All @@ -95,10 +102,9 @@ object ESClient:
.recordDuration(
TimeUnit.MILLISECONDS,
withErrorType(
Attributes(
Attribute(opAttributeKey, "count"),
Attribute(indexAttributeKey, q.index(query).value)
)
baseAttributes
.added(Attribute(dbOperationName, "count"))
.added(Attribute(dbCollectionName, q.index(query).value))
)
)
.surround:
Expand All @@ -112,10 +118,9 @@ object ESClient:
.recordDuration(
TimeUnit.MILLISECONDS,
withErrorType(
Attributes(
Attribute(opAttributeKey, "store-one"),
Attribute(indexAttributeKey, index.value)
)
baseAttributes
.added(Attribute(dbOperationName, "store"))
.added(Attribute(dbCollectionName, index.value))
)
)
.surround:
Expand All @@ -130,11 +135,10 @@ object ESClient:
.recordDuration(
TimeUnit.MILLISECONDS,
withErrorType(
Attributes(
Attribute(opAttributeKey, "store-bulk"),
Attribute(indexAttributeKey, index.value),
Attribute(sizeAttributeKey, objs.size)
)
baseAttributes
.added(Attribute(dbOperationName, "store-bulk"))
.added(Attribute(dbCollectionName, index.value))
.added(Attribute(dbBatchSize, objs.size))
)
)
.surround:
Expand All @@ -148,10 +152,9 @@ object ESClient:
.recordDuration(
TimeUnit.MILLISECONDS,
withErrorType(
Attributes(
Attribute(opAttributeKey, "delete-one"),
Attribute(indexAttributeKey, index.value)
)
baseAttributes
.added(Attribute(dbOperationName, "delete-one"))
.added(Attribute(dbCollectionName, index.value))
)
)
.surround:
Expand All @@ -164,11 +167,10 @@ object ESClient:
.recordDuration(
TimeUnit.MILLISECONDS,
withErrorType(
Attributes(
Attribute(opAttributeKey, "delete-bulk"),
Attribute(indexAttributeKey, index.value),
Attribute(sizeAttributeKey, ids.size)
)
baseAttributes
.added(Attribute(dbOperationName, "delete-bulk"))
.added(Attribute(dbCollectionName, index.value))
.added(Attribute(dbBatchSize, ids.size))
)
)
.surround:
Expand Down

0 comments on commit b31e25f

Please sign in to comment.