diff --git a/prism-agent/service/project/Dependencies.scala b/prism-agent/service/project/Dependencies.scala index b516ee82dc..05a6aec450 100644 --- a/prism-agent/service/project/Dependencies.scala +++ b/prism-agent/service/project/Dependencies.scala @@ -9,11 +9,11 @@ object Dependencies { val akka = "2.6.20" val akkaHttp = "10.2.9" val castor = "0.8.0" - val pollux = "0.23.0" - val connect = "0.9.0" + val pollux = "0.24.0" + val connect = "0.10.0" val bouncyCastle = "1.70" val logback = "1.4.5" - val mercury = "0.17.0" + val mercury = "0.18.0" val zioJson = "0.3.0" val tapir = "1.2.3" val flyway = "9.8.3" diff --git a/prism-agent/service/server/src/main/resources/application.conf b/prism-agent/service/server/src/main/resources/application.conf index c850139f66..f302db57df 100644 --- a/prism-agent/service/server/src/main/resources/application.conf +++ b/prism-agent/service/server/src/main/resources/application.conf @@ -28,6 +28,7 @@ castor { username = ${?CASTOR_DB_USER} password = "postgres" password = ${?CASTOR_DB_PASSWORD} + awaitConnectionThreads = 8 } } @@ -43,7 +44,12 @@ pollux { username = ${?POLLUX_DB_USER} password = "postgres" password = ${?POLLUX_DB_PASSWORD} + awaitConnectionThreads = 8 } + issueBgJobRecurrenceDelay = 2 seconds + issueBgJobProcessingParallelism = 25 + presentationBgJobRecurrenceDelay = 2 seconds + presentationBgJobProcessingParallelism = 25 } connect { @@ -58,7 +64,10 @@ connect { username = ${?CONNECT_DB_USER} password = "postgres" password = ${?CONNECT_DB_PASSWORD} + awaitConnectionThreads = 8 } + connectBgJobRecurrenceDelay = 2 seconds + connectBgJobProcessingParallelism = 25 } agent { @@ -80,5 +89,6 @@ agent { username = ${?AGENT_DB_USER} password = "postgres" password = ${?AGENT_DB_PASSWORD} + awaitConnectionThreads = 8 } } diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala index fddf3846fb..d649f5efad 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala @@ -37,7 +37,7 @@ object Main extends ZIOAppDefault { } yield () def appComponents(didCommServicePort: Int, restServicePort: Int) = for { - _ <- Modules.didCommExchangesJob.debug.fork + _ <- Modules.issueCredentialDidCommExchangesJob.debug.fork _ <- Modules.presentProofExchangeJob.debug.fork _ <- Modules.connectDidCommExchangesJob.debug.fork _ <- Modules.didCommServiceEndpoint(didCommServicePort).debug.fork diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Modules.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Modules.scala index fab32a75a6..9bc800c98b 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Modules.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Modules.scala @@ -155,26 +155,38 @@ object Modules { Server.start(port, app) } - val didCommExchangesJob: RIO[ - DidOps & DIDResolver & JwtDidResolver & HttpClient & CredentialService & ManagedDIDService & DIDSecretStorage, + val issueCredentialDidCommExchangesJob: RIO[ + AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & CredentialService & ManagedDIDService & + DIDSecretStorage, Unit ] = - BackgroundJobs.didCommExchanges - .repeat(Schedule.spaced(10.seconds)) - .unit + for { + config <- ZIO.service[AppConfig] + job <- BackgroundJobs.issueCredentialDidCommExchanges + .repeat(Schedule.spaced(config.pollux.issueBgJobRecurrenceDelay)) + .unit + } yield job val presentProofExchangeJob: RIO[ - DidOps & DIDResolver & JwtDidResolver & HttpClient & PresentationService & ManagedDIDService & DIDSecretStorage, + AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & PresentationService & ManagedDIDService & + DIDSecretStorage, Unit ] = - BackgroundJobs.presentProofExchanges - .repeat(Schedule.spaced(10.seconds)) - .unit - - val connectDidCommExchangesJob: RIO[DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService, Unit] = - ConnectBackgroundJobs.didCommExchanges - .repeat(Schedule.spaced(10.seconds)) - .unit + for { + config <- ZIO.service[AppConfig] + job <- BackgroundJobs.presentProofExchanges + .repeat(Schedule.spaced(config.pollux.presentationBgJobRecurrenceDelay)) + .unit + } yield job + + val connectDidCommExchangesJob + : RIO[AppConfig & DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService, Unit] = + for { + config <- ZIO.service[AppConfig] + job <- ConnectBackgroundJobs.didCommExchanges + .repeat(Schedule.spaced(config.connect.connectBgJobRecurrenceDelay)) + .unit + } yield job val syncDIDPublicationStateFromDltJob: URIO[ManagedDIDService, Unit] = BackgroundJobs.syncDIDPublicationStateFromDlt @@ -523,7 +535,7 @@ object RepoModule { username = config.username, password = config.password, jdbcUrl = s"jdbc:postgresql://${config.host}:${config.port}/${config.databaseName}", - awaitConnectionThreads = 2 + awaitConnectionThreads = config.awaitConnectionThreads ) } } @@ -549,7 +561,7 @@ object RepoModule { username = config.username, password = config.password, jdbcUrl = s"jdbc:postgresql://${config.host}:${config.port}/${config.databaseName}", - awaitConnectionThreads = 2 + awaitConnectionThreads = config.awaitConnectionThreads ) } } @@ -575,7 +587,7 @@ object RepoModule { username = config.username, password = config.password, jdbcUrl = s"jdbc:postgresql://${config.host}:${config.port}/${config.databaseName}", - awaitConnectionThreads = 2 + awaitConnectionThreads = config.awaitConnectionThreads ) } } diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/config/AppConfig.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/config/AppConfig.scala index 24bca16a46..a6553772e3 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/config/AppConfig.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/config/AppConfig.scala @@ -2,6 +2,7 @@ package io.iohk.atala.agent.server.config import zio.config.* import zio.config.magnolia.Descriptor +import java.time.Duration final case class AppConfig( iris: IrisConfig, @@ -19,14 +20,31 @@ object AppConfig { final case class IrisConfig(service: GrpcServiceConfig) final case class CastorConfig(database: DatabaseConfig) -final case class PolluxConfig(database: DatabaseConfig) -final case class ConnectConfig(database: DatabaseConfig) +final case class PolluxConfig( + database: DatabaseConfig, + issueBgJobRecurrenceDelay: Duration, + issueBgJobProcessingParallelism: Int, + presentationBgJobRecurrenceDelay: Duration, + presentationBgJobProcessingParallelism: Int, +) +final case class ConnectConfig( + database: DatabaseConfig, + connectBgJobRecurrenceDelay: Duration, + connectBgJobProcessingParallelism: Int +) final case class PrismNodeConfig(service: GrpcServiceConfig) final case class GrpcServiceConfig(host: String, port: Int) -final case class DatabaseConfig(host: String, port: Int, databaseName: String, username: String, password: String) +final case class DatabaseConfig( + host: String, + port: Int, + databaseName: String, + username: String, + password: String, + awaitConnectionThreads: Int +) final case class AgentConfig( httpEndpoint: HttpEndpointConfig, diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/http/ZioHttpClient.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/http/ZioHttpClient.scala index 1a3ad7ebbe..df975c8877 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/http/ZioHttpClient.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/http/ZioHttpClient.scala @@ -2,7 +2,7 @@ package io.iohk.atala.agent.server.http import zio._ import zio.http._ -import zio.http.model._ +import zio.http.model.{Header => _, _} import zio.http.service._ import io.iohk.atala.mercury._ @@ -12,15 +12,24 @@ object ZioHttpClient { class ZioHttpClient extends HttpClient { - override def get(url: String): Task[HttpResponseBody] = + override def get(url: String): Task[HttpResponse] = zio.http.Client .request(url) .provideSomeLayer(zio.http.Client.default) .provideSomeLayer(zio.Scope.default) - .flatMap(_.body.asString) - .map(e => HttpResponseBody(e)) + .flatMap { response => + response.headers.toSeq.map(e => e) + response.body.asString + .map(body => + HttpResponse( + response.status.code, + response.headers.toSeq.map(h => Header(h.key.toString, h.value.toString)), + body + ) + ) + } - def postDIDComm(url: String, data: String): Task[HttpResponseBody] = + def postDIDComm(url: String, data: String): Task[HttpResponse] = zio.http.Client .request( url = url, // TODO make ERROR type @@ -32,6 +41,15 @@ class ZioHttpClient extends HttpClient { ) .provideSomeLayer(zio.http.Client.default) .provideSomeLayer(zio.Scope.default) - .flatMap(_.body.asString) - .map(e => HttpResponseBody(e)) + .flatMap { response => + response.headers.toSeq.map(e => e) + response.body.asString + .map(body => + HttpResponse( + response.status.code, + response.headers.toSeq.map(h => Header(h.key.toString, h.value.toString)), + body + ) + ) + } } diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobs.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobs.scala index e19623aa87..21214d4c14 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobs.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobs.scala @@ -20,8 +20,6 @@ import io.iohk.atala.resolvers.DIDResolver import io.iohk.atala.resolvers.UniversalDidResolver import java.io.IOException import io.iohk.atala.pollux.vc.jwt._ -import zhttp.service._ -import zhttp.http._ import io.iohk.atala.pollux.vc.jwt.W3CCredential import io.iohk.atala.pollux.core.model.PresentationRecord import io.iohk.atala.pollux.core.service.PresentationService @@ -51,25 +49,41 @@ import io.circe.Json import io.circe.syntax._ import io.iohk.atala.pollux.vc.jwt.JWT import io.iohk.atala.pollux.vc.jwt.{DidResolver => JwtDidResolver} +import io.iohk.atala.agent.server.config.AppConfig object BackgroundJobs { - val didCommExchanges = { + val issueCredentialDidCommExchanges = { for { credentialService <- ZIO.service[CredentialService] + config <- ZIO.service[AppConfig] records <- credentialService - .getIssueCredentialRecords() - .mapError(err => Throwable(s"Error occured while getting issue credential records: $err")) - _ <- ZIO.foreach(records)(performExchange) + .getIssueCredentialRecordsByStates( + IssueCredentialRecord.ProtocolState.OfferPending, + IssueCredentialRecord.ProtocolState.RequestPending, + IssueCredentialRecord.ProtocolState.RequestReceived, + IssueCredentialRecord.ProtocolState.CredentialPending, + IssueCredentialRecord.ProtocolState.CredentialGenerated + ) + .mapError(err => Throwable(s"Error occurred while getting Issue Credential records: $err")) + _ <- ZIO.foreachPar(records)(performExchange).withParallelism(config.pollux.issueBgJobProcessingParallelism) } yield () } val presentProofExchanges = { for { presentationService <- ZIO.service[PresentationService] + config <- ZIO.service[AppConfig] records <- presentationService - .getPresentationRecords() - .mapError(err => Throwable(s"Error occured while getting Presentation records: $err")) - _ <- ZIO.foreach(records)(performPresentation) + .getPresentationRecordsByStates( + PresentationRecord.ProtocolState.RequestPending, + PresentationRecord.ProtocolState.PresentationPending, + PresentationRecord.ProtocolState.PresentationGenerated, + PresentationRecord.ProtocolState.PresentationReceived + ) + .mapError(err => Throwable(s"Error occurred while getting Presentation records: $err")) + _ <- ZIO + .foreachPar(records)(performPresentation) + .withParallelism(config.pollux.presentationBgJobProcessingParallelism) } yield () } @@ -90,11 +104,14 @@ object BackgroundJobs { for { _ <- ZIO.log(s"IssueCredentialRecord: OfferPending (START)") didCommAgent <- buildDIDCommAgent(offer.from) - _ <- MessagingService + resp <- MessagingService .send(offer.makeMessage) .provideSomeLayer(didCommAgent) credentialService <- ZIO.service[CredentialService] - _ <- credentialService.markOfferSent(id) + _ <- { + if (resp.status >= 200 && resp.status < 300) credentialService.markOfferSent(id) + else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}") + } } yield () // Request should be sent from Holder to Issuer @@ -118,11 +135,14 @@ object BackgroundJobs { ) => for { didCommAgent <- buildDIDCommAgent(request.from) - _ <- MessagingService + resp <- MessagingService .send(request.makeMessage) .provideSomeLayer(didCommAgent) credentialService <- ZIO.service[CredentialService] - _ <- credentialService.markRequestSent(id) + _ <- { + if (resp.status >= 200 && resp.status < 300) credentialService.markRequestSent(id) + else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}") + } } yield () // 'automaticIssuance' is TRUE. Issuer automatically accepts the Request @@ -212,11 +232,14 @@ object BackgroundJobs { ) => for { didCommAgent <- buildDIDCommAgent(issue.from) - _ <- MessagingService + resp <- MessagingService .send(issue.makeMessage) .provideSomeLayer(didCommAgent) credentialService <- ZIO.service[CredentialService] - _ <- credentialService.markCredentialSent(id) + _ <- { + if (resp.status >= 200 && resp.status < 300) credentialService.markCredentialSent(id) + else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}") + } } yield () // Credential has been generated, published, and can now be sent to the Holder @@ -240,9 +263,12 @@ object BackgroundJobs { ) => for { didCommAgent <- buildDIDCommAgent(issue.from) - _ <- MessagingService.send(issue.makeMessage).provideSomeLayer(didCommAgent) + resp <- MessagingService.send(issue.makeMessage).provideSomeLayer(didCommAgent) credentialService <- ZIO.service[CredentialService] - _ <- credentialService.markCredentialSent(id) + _ <- { + if (resp.status >= 200 && resp.status < 300) credentialService.markCredentialSent(id) + else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}") + } } yield () case IssueCredentialRecord(id, _, _, _, _, _, _, _, _, _, ProblemReportPending, _, _, _, _, _) => ??? @@ -340,9 +366,12 @@ object BackgroundJobs { _ <- ZIO.log(s"PresentationRecord: RequestPending (Send Massage)") didOps <- ZIO.service[DidOps] didCommAgent <- buildDIDCommAgent(record.from) - _ <- MessagingService.send(record.makeMessage).provideSomeLayer(didCommAgent) + resp <- MessagingService.send(record.makeMessage).provideSomeLayer(didCommAgent) service <- ZIO.service[PresentationService] - _ <- service.markRequestPresentationSent(id) + _ <- { + if (resp.status >= 200 && resp.status < 300) service.markRequestPresentationSent(id) + else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}") + } } yield () case PresentationRecord(id, _, _, _, _, _, _, _, RequestSent, _, _, _, _) => // Verifier @@ -412,11 +441,14 @@ object BackgroundJobs { for { _ <- ZIO.log(s"PresentationRecord: PresentationPending (Send Message)") didCommAgent <- buildDIDCommAgent(p.from) - _ <- MessagingService + resp <- MessagingService .send(p.makeMessage) .provideSomeLayer(didCommAgent) service <- ZIO.service[PresentationService] - _ <- service.markPresentationSent(id) + _ <- { + if (resp.status >= 200 && resp.status < 300) service.markPresentationSent(id) + else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}") + } } yield () case PresentationRecord(id, _, _, _, _, _, _, _, PresentationSent, _, _, _, _) => ZIO.logDebug("PresentationRecord: PresentationSent") *> ZIO.unit diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala index ded93f4f64..81fdd21a9d 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala @@ -18,16 +18,21 @@ import io.iohk.atala.resolvers.UniversalDidResolver import io.iohk.atala.agent.walletapi.service.ManagedDIDService import io.iohk.atala.agent.walletapi.model.error.DIDSecretStorageError import io.iohk.atala.agent.walletapi.model.error.DIDSecretStorageError.KeyNotFoundError +import io.iohk.atala.agent.server.config.AppConfig object ConnectBackgroundJobs { val didCommExchanges = { for { connectionService <- ZIO.service[ConnectionService] + config <- ZIO.service[AppConfig] records <- connectionService - .getConnectionRecords() - .mapError(err => Throwable(s"Error occured while getting connection records: $err")) - _ <- ZIO.foreach(records)(performExchange) + .getConnectionRecordsByStates( + ConnectionRecord.ProtocolState.ConnectionRequestPending, + ConnectionRecord.ProtocolState.ConnectionResponsePending + ) + .mapError(err => Throwable(s"Error occurred while getting connection records: $err")) + _ <- ZIO.foreachPar(records)(performExchange).withParallelism(config.connect.connectBgJobProcessingParallelism) } yield () } @@ -54,9 +59,12 @@ object ConnectBackgroundJobs { val aux = for { didCommAgent <- buildDIDCommAgent(request.from) - _ <- MessagingService.send(request.makeMessage).provideSomeLayer(didCommAgent) + resp <- MessagingService.send(request.makeMessage).provideSomeLayer(didCommAgent) connectionService <- ZIO.service[ConnectionService] - _ <- connectionService.markConnectionRequestSent(id) + _ <- { + if (resp.status >= 200 && resp.status < 300) connectionService.markConnectionRequestSent(id) + else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}") + } } yield () // aux // TODO decrete metaRetries if it has a error @@ -78,9 +86,12 @@ object ConnectBackgroundJobs { ) if metaRetries > 0 => val aux = for { didCommAgent <- buildDIDCommAgent(response.from) - _ <- MessagingService.send(response.makeMessage).provideSomeLayer(didCommAgent) + resp <- MessagingService.send(response.makeMessage).provideSomeLayer(didCommAgent) connectionService <- ZIO.service[ConnectionService] - _ <- connectionService.markConnectionResponseSent(id) + _ <- { + if (resp.status >= 200 && resp.status < 300) connectionService.markConnectionResponseSent(id) + else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}") + } } yield () aux.tapError(ex =>