From 20315aedea3c8c2953cfd5ee391feb10fbc1146c Mon Sep 17 00:00:00 2001 From: shotexa Date: Tue, 5 Sep 2023 20:17:44 +0400 Subject: [PATCH] feat(prism-agent): metrics for issuance flow (#669) Signed-off-by: Shota Jolbordi --- .../iohk/atala/mercury/MessagingService.scala | 2 +- .../core/service/CredentialServiceImpl.scala | 28 ++- .../agent/server/jobs/BackgroundJobs.scala | 207 ++++++++++++++++-- .../agent/walletapi/util/SeedResolver.scala | 3 +- .../utils/aspects/CustomMetricsAspect.scala | 1 + 5 files changed, 214 insertions(+), 27 deletions(-) diff --git a/mercury/mercury-library/agent-didcommx/src/main/scala/io/iohk/atala/mercury/MessagingService.scala b/mercury/mercury-library/agent-didcommx/src/main/scala/io/iohk/atala/mercury/MessagingService.scala index b97363a307..e0dba38d3e 100644 --- a/mercury/mercury-library/agent-didcommx/src/main/scala/io/iohk/atala/mercury/MessagingService.scala +++ b/mercury/mercury-library/agent-didcommx/src/main/scala/io/iohk/atala/mercury/MessagingService.scala @@ -123,7 +123,7 @@ object MessagingService { else didCommService.packEncrypted(msg = finalMessage, to = finalMessage.to.head) // TODO Head - _ <- ZIO.log(s"Sending to Message to '$serviceEndpoint'") + _ <- ZIO.log(s"Sending a Message to '$serviceEndpoint'") resp <- HttpClient .postDIDComm(url = serviceEndpoint, data = encryptedMessage.string) .catchAll { case ex => ZIO.fail(SendMessageError(ex, Some(encryptedMessage.string))) } diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceImpl.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceImpl.scala index e78ecc0990..b7df729dda 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceImpl.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceImpl.scala @@ -18,6 +18,7 @@ import io.iohk.atala.pollux.core.model.schema.CredentialSchema import io.iohk.atala.pollux.core.repository.CredentialRepository import io.iohk.atala.pollux.vc.jwt.* import io.iohk.atala.prism.crypto.{MerkleInclusionProof, MerkleTreeKt, Sha256} +import io.iohk.atala.shared.utils.aspects.CustomMetricsAspect import zio.* import zio.prelude.ZValidation @@ -138,7 +139,8 @@ private class CredentialServiceImpl( case 1 => ZIO.succeed(()) case n => ZIO.fail(UnexpectedException(s"Invalid row count result: $n")) } - .mapError(RepositoryError.apply) + .mapError(RepositoryError.apply) @@ CustomMetricsAspect + .startRecordingTime(s"${record.id}_issuer_offer_pending_to_sent_ms_gauge") } yield record } @@ -214,7 +216,9 @@ private class CredentialServiceImpl( record <- getRecordWithState(recordId, ProtocolState.OfferReceived) count <- credentialRepository .updateWithSubjectId(recordId, subjectId, ProtocolState.RequestPending) - .mapError(RepositoryError.apply) + .mapError(RepositoryError.apply) @@ CustomMetricsAspect.startRecordingTime( + s"${record.id}_issuance_flow_holder_req_pending_to_generated" + ) _ <- count match case 1 => ZIO.succeed(()) case n => ZIO.fail(RecordIdNotFound(recordId)) @@ -261,7 +265,10 @@ private class CredentialServiceImpl( request = createDidCommRequestCredential(offer, signedPresentation) count <- credentialRepository .updateWithRequestCredential(recordId, request, ProtocolState.RequestGenerated) - .mapError(RepositoryError.apply) + .mapError(RepositoryError.apply) @@ CustomMetricsAspect.endRecordingTime( + s"${record.id}_issuance_flow_holder_req_pending_to_generated", + "issuance_flow_holder_req_pending_to_generated_ms_gauge" + ) @@ CustomMetricsAspect.startRecordingTime(s"${record.id}_issuance_flow_holder_req_generated_to_sent") _ <- count match case 1 => ZIO.succeed(()) case n => ZIO.fail(RecordIdNotFound(recordId)) @@ -308,7 +315,9 @@ private class CredentialServiceImpl( issue = createDidCommIssueCredential(request) count <- credentialRepository .updateWithIssueCredential(recordId, issue, ProtocolState.CredentialPending) - .mapError(RepositoryError.apply) + .mapError(RepositoryError.apply) @@ CustomMetricsAspect.startRecordingTime( + s"${record.id}_issuance_flow_issuer_credential_pending_to_generated" + ) _ <- count match case 1 => ZIO.succeed(()) case n => ZIO.fail(RecordIdNotFound(recordId)) @@ -356,6 +365,9 @@ private class CredentialServiceImpl( recordId, IssueCredentialRecord.ProtocolState.RequestGenerated, IssueCredentialRecord.ProtocolState.RequestSent + ) @@ CustomMetricsAspect.endRecordingTime( + s"${recordId}_issuance_flow_holder_req_generated_to_sent", + "issuance_flow_holder_req_generated_to_sent_ms_gauge" ) override def markCredentialGenerated( @@ -370,7 +382,10 @@ private class CredentialServiceImpl( issueCredential, IssueCredentialRecord.ProtocolState.CredentialGenerated ) - .mapError(RepositoryError.apply) + .mapError(RepositoryError.apply) @@ CustomMetricsAspect.endRecordingTime( + s"${record.id}_issuance_flow_issuer_credential_pending_to_generated", + "issuance_flow_issuer_credential_pending_to_generated_ms_gauge" + ) @@ CustomMetricsAspect.startRecordingTime(s"${record.id}_issuance_flow_issuer_credential_generated_to_sent") _ <- count match case 1 => ZIO.succeed(()) case n => ZIO.fail(RecordIdNotFound(recordId)) @@ -390,6 +405,9 @@ private class CredentialServiceImpl( recordId, IssueCredentialRecord.ProtocolState.CredentialGenerated, IssueCredentialRecord.ProtocolState.CredentialSent + ) @@ CustomMetricsAspect.endRecordingTime( + s"${recordId}_issuance_flow_issuer_credential_generated_to_sent", + "issuance_flow_issuer_credential_generated_to_sent_ms_gauge" ) override def markCredentialPublicationPending( diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobs.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobs.scala index fd43e5b385..b2b38f0ba1 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobs.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobs.scala @@ -32,9 +32,13 @@ import io.iohk.atala.pollux.vc.jwt.{ DidResolver as JwtDidResolver, Issuer as JwtIssuer } +import io.iohk.atala.shared.utils.aspects.CustomMetricsAspect import zio.* import zio.prelude.ZValidation.* import zio.prelude.Validation +import zio.metrics.* +import io.iohk.atala.shared.utils.DurationOps.toMetricsSeconds + import java.time.{Clock, Instant, ZoneId} object BackgroundJobs { @@ -84,6 +88,101 @@ object BackgroundJobs { import IssueCredentialRecord.* import IssueCredentialRecord.ProtocolState.* import IssueCredentialRecord.PublicationState.* + + def counterMetric(key: String) = Metric + .counterInt(key) + .fromConst(1) + + val IssuerSendOfferMsgFailed = counterMetric( + "issuance_flow_issuer_send_offer_msg_failed_counter" + ) + val IssuerSendOfferMsgSucceed = counterMetric( + "issuance_flow_issuer_send_offer_msg_succeed_counter" + ) + + val IssuerSendOfferSucceed = counterMetric( + "issuance_flow_issuer_send_offer_flow_succeed_counter" + ) + + val IssuerSendOfferFailed = counterMetric( + "issuance_flow_issuer_send_offer_flow_failed_counter" + ) + + val IssuerSendOfferAll = counterMetric( + "issuance_flow_issuer_send_offer_flow_all_counter" + ) + + val HolderPendingToGeneratedSuccess = counterMetric( + "issuance_flow_holder_req_pending_to_generated_flow_success_counter" + ) + + val HolderPendingToGeneratedFailed = counterMetric( + "issuance_flow_holder_req_pending_to_generated_flow_failed_counter" + ) + + val HolderPendingToGeneratedAll = counterMetric( + "issuance_flow_holder_req_pending_to_generated_flow_all_counter" + ) + + val HolderSendReqSucceed = counterMetric( + "issuance_flow_holder_send_req_msg_succeed_counter" + ) + + val HolderSendReqFailed = counterMetric( + "issuance_flow_holder_send_req_msg_failed_counter" + ) + + val HolderGeneratedToSentSucceed = counterMetric( + "issuance_flow_holder_req_generated_to_sent_flow_success_counter" + ) + val HolderGeneratedToSentFailed = counterMetric( + "issuance_flow_holder_req_generated_to_sent_flow_failed_counter" + ) + val HolderGeneratedToSentAll = counterMetric( + "issuance_flow_holder_req_generated_to_sent_flow_all_counter" + ) + + val IssuerReceivedToPendingSuccess = counterMetric( + "issuance_flow_issuer_cred_received_to_pending_flow_success_counter" + ) + + val IssuerReceivedToPendingFailed = counterMetric( + "issuance_flow_issuer_cred_received_to_pending_flow_failed_counter" + ) + + val IssuerReceivedToPendingAll = counterMetric( + "issuance_flow_issuer_cred_received_to_pending_flow_all_counter" + ) + + val IssuerPendingToGeneratedSuccess = counterMetric( + "issuance_flow_issuer_cred_pending_to_generated_flow_success_counter" + ) + val IssuerPendingToGeneratedFailed = counterMetric( + "issuance_flow_issuer_cred_pending_to_generated_flow_failed_counter" + ) + val IssuerPendingToGeneratedAll = counterMetric( + "issuance_flow_issuer_cred_pending_to_generated_flow_all_counter" + ) + + val IssuerSendCredentialSuccess = counterMetric( + "issuance_flow_issuer_send_cred_success_counter" + ) + + val IssuerSendCredentialFailed = counterMetric( + "issuance_flow_issuer_send_cred_failed_counter" + ) + + val IssuerSendCredentialAll = counterMetric( + "issuance_flow_issuer_send_cred_all_counter" + ) + + val IssuerSendCredentialMsgFailed = counterMetric( + "issuance_flow_issuer_send_credential_msg_failed_counter" + ) + val IssuerSendCredentialMsgSuccess = counterMetric( + "issuance_flow_issuer_send_credential_msg_succeed_counter" + ) + val aux = for { _ <- ZIO.logDebug(s"Running action with records => $record") _ <- record match { @@ -110,19 +209,34 @@ object BackgroundJobs { _, _, ) => - for { + val sendOfferFlow = for { _ <- ZIO.log(s"IssueCredentialRecord: OfferPending (START)") didCommAgent <- buildDIDCommAgent(offer.from) resp <- MessagingService .send(offer.makeMessage) - .provideSomeLayer(didCommAgent) + .provideSomeLayer(didCommAgent) @@ Metric + .gauge("issuance_flow_issuer_send_offer_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) credentialService <- ZIO.service[CredentialService] _ <- { - if (resp.status >= 200 && resp.status < 300) credentialService.markOfferSent(id) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) + if (resp.status >= 200 && resp.status < 300) + credentialService.markOfferSent(id) @@ + IssuerSendOfferMsgSucceed @@ + CustomMetricsAspect.endRecordingTime( + s"${record.id}_issuer_offer_pending_to_sent_ms_gauge", + "issuance_flow_issuer_offer_pending_to_sent_ms_gauge" + ) + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ IssuerSendOfferMsgFailed } } yield () + sendOfferFlow @@ IssuerSendOfferSucceed.trackSuccess + @@ IssuerSendOfferFailed.trackError + @@ IssuerSendOfferAll + @@ Metric + .gauge("issuance_flow_issuer_send_offer_flow_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + // Request should be sent from Holder to Issuer case IssueCredentialRecord( id, @@ -146,7 +260,7 @@ object BackgroundJobs { _, _ ) => - for { + val holderPendingToGeneratedFlow = for { credentialService <- ZIO.service[CredentialService] subjectDID <- ZIO .fromEither(PrismDID.fromString(subjectId)) @@ -158,6 +272,13 @@ object BackgroundJobs { _ <- credentialService.generateCredentialRequest(id, signedPayload) } yield () + holderPendingToGeneratedFlow @@ HolderPendingToGeneratedSuccess.trackSuccess + @@ HolderPendingToGeneratedFailed.trackError + @@ HolderPendingToGeneratedAll + @@ Metric + .gauge("issuance_flow_holder_req_pending_to_generated_flow_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + // Request should be sent from Holder to Issuer case IssueCredentialRecord( id, @@ -181,18 +302,32 @@ object BackgroundJobs { _, _ ) => - for { + val holderGeneratedToSentFlow = for { didCommAgent <- buildDIDCommAgent(request.from) resp <- MessagingService .send(request.makeMessage) - .provideSomeLayer(didCommAgent) + .provideSomeLayer(didCommAgent) @@ Metric + .gauge("issuance_flow_holder_send_request_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) credentialService <- ZIO.service[CredentialService] _ <- { - if (resp.status >= 200 && resp.status < 300) credentialService.markRequestSent(id) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) + if (resp.status >= 200 && resp.status < 300) + credentialService.markRequestSent(id) @@ HolderSendReqSucceed + @@ CustomMetricsAspect.endRecordingTime( + s"${record.id}_issuance_flow_holder_req_generated_to_sent", + "issuance_flow_holder_req_generated_to_sent_ms_gauge" + ) + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ HolderSendReqFailed } } yield () + holderGeneratedToSentFlow @@ HolderGeneratedToSentSucceed.trackSuccess + @@ HolderGeneratedToSentFailed.trackError + @@ HolderGeneratedToSentAll + @@ Metric + .gauge("issuance_flow_holder_req_generated_to_sent_flow_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + // 'automaticIssuance' is TRUE. Issuer automatically accepts the Request case IssueCredentialRecord( id, @@ -216,11 +351,18 @@ object BackgroundJobs { _, _, ) => - for { + val issuerReceivedToPendingFlow = for { credentialService <- ZIO.service[CredentialService] _ <- credentialService.acceptCredentialRequest(id) } yield () + issuerReceivedToPendingFlow @@ IssuerReceivedToPendingSuccess.trackSuccess + @@ IssuerReceivedToPendingFailed.trackError + @@ IssuerReceivedToPendingAll + @@ Metric + .gauge("issuance_flow_issuer_cred_pending_to_generated_flow_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + // Credential is pending, can be generated by Issuer and optionally published on-chain case IssueCredentialRecord( id, @@ -244,10 +386,10 @@ object BackgroundJobs { _, _, ) => - // Generate the JWT Credential and store it in DB as an attacment to IssueCredentialData + // Generate the JWT Credential and store it in DB as an attachment to IssueCredentialData // Set ProtocolState to CredentialGenerated // Set PublicationState to PublicationPending - for { + val issuerPendingToGeneratedFlow = for { credentialService <- ZIO.service[CredentialService] longFormPrismDID <- getLongForm(issuerDID, true) jwtIssuer <- createJwtIssuer(longFormPrismDID, VerificationRelationship.AssertionMethod) @@ -267,6 +409,13 @@ object BackgroundJobs { } yield () + issuerPendingToGeneratedFlow @@ IssuerPendingToGeneratedSuccess.trackSuccess + @@ IssuerPendingToGeneratedFailed.trackError + @@ IssuerPendingToGeneratedAll + @@ Metric + .gauge("issuance_flow_issuer_cred_received_to_pending_flow_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + // Credential has been generated and can be sent directly to the Holder case IssueCredentialRecord( id, @@ -290,18 +439,27 @@ object BackgroundJobs { _, _, ) => - for { + val sendCredentialManualIssuanceFlow = for { didCommAgent <- buildDIDCommAgent(issue.from) resp <- MessagingService .send(issue.makeMessage) - .provideSomeLayer(didCommAgent) + .provideSomeLayer(didCommAgent) @@ Metric + .gauge("issuance_flow_issuer_send_credential_msg_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) credentialService <- ZIO.service[CredentialService] _ <- { - if (resp.status >= 200 && resp.status < 300) credentialService.markCredentialSent(id) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) + if (resp.status >= 200 && resp.status < 300) + credentialService.markCredentialSent(id) @@ IssuerSendCredentialMsgSuccess + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ IssuerSendCredentialMsgFailed } } yield () + sendCredentialManualIssuanceFlow @@ IssuerSendCredentialSuccess.trackSuccess + @@ IssuerSendCredentialFailed.trackError + @@ IssuerSendCredentialAll @@ Metric + .gauge("issuance_flow_issuer_send_cred_flow_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + // Credential has been generated, published, and can now be sent to the Holder case IssueCredentialRecord( id, @@ -325,16 +483,25 @@ object BackgroundJobs { _, _ ) => - for { + val sendCredentialAutomaticIssuanceFlow = for { didCommAgent <- buildDIDCommAgent(issue.from) - resp <- MessagingService.send(issue.makeMessage).provideSomeLayer(didCommAgent) + resp <- MessagingService.send(issue.makeMessage).provideSomeLayer(didCommAgent) @@ Metric + .gauge("issuance_flow_issuer_send_credential_msg_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) credentialService <- ZIO.service[CredentialService] _ <- { - if (resp.status >= 200 && resp.status < 300) credentialService.markCredentialSent(id) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) + if (resp.status >= 200 && resp.status < 300) + credentialService.markCredentialSent(id) @@ IssuerSendCredentialMsgSuccess + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ IssuerSendCredentialMsgFailed } } yield () + sendCredentialAutomaticIssuanceFlow @@ IssuerSendCredentialSuccess.trackSuccess + @@ IssuerSendCredentialFailed.trackError + @@ IssuerSendCredentialAll @@ Metric + .gauge("issuance_flow_issuer_send_cred_flow_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + case IssueCredentialRecord(id, _, _, _, _, _, _, _, _, _, ProblemReportPending, _, _, _, _, _, _, _, _, _) => ??? case IssueCredentialRecord(id, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _) => ZIO.unit diff --git a/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/util/SeedResolver.scala b/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/util/SeedResolver.scala index 7c7997edef..eba08156e3 100644 --- a/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/util/SeedResolver.scala +++ b/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/util/SeedResolver.scala @@ -17,9 +17,10 @@ private class SeedResolverImpl(apollo: Apollo, isDevMode: Boolean) extends SeedR override def resolve: Task[Array[Byte]] = { val seedEnv = for { - _ <- ZIO.logInfo("Resolving a wallet seed using WALLET_SEED environemnt variable") + _ <- ZIO.logInfo("Resolving a wallet seed using WALLET_SEED environment variable") maybeSeed <- System .env("WALLET_SEED") + .tapSome(seed => ZIO.logInfo(s"got wallet seed - ${seed}")) .flatMap { case Some(hex) => ZIO.fromTry(HexString.fromString(hex)).map(_.toByteArray).asSome case None => ZIO.none diff --git a/shared/src/main/scala/io/iohk/atala/shared/utils/aspects/CustomMetricsAspect.scala b/shared/src/main/scala/io/iohk/atala/shared/utils/aspects/CustomMetricsAspect.scala index bc9c55e268..6c5515b2fc 100644 --- a/shared/src/main/scala/io/iohk/atala/shared/utils/aspects/CustomMetricsAspect.scala +++ b/shared/src/main/scala/io/iohk/atala/shared/utils/aspects/CustomMetricsAspect.scala @@ -36,6 +36,7 @@ object CustomMetricsAspect { res <- zio end <- now maybeStart = checkpoints.get(key) + _ = checkpoints.remove(key) metricsZio = maybeStart.map(start => Duration.between(start, end)).fold(ZIO.unit) { duration => ZIO.succeed(duration.toMetricsSeconds) @@ Metric.gauge(metricsKey).tagged(tags) }