Skip to content

Commit

Permalink
feat(prism-agent): ATL-3349 connection state issue and perf improveme…
Browse files Browse the repository at this point in the history
…nts (#359)

* feat(prism-agent): make sure DB records are processed in parallel in background jobs

* feat(connect): allow ConnectionResponse reception while ConnectionRequest sending in ongoing but not yet committed

* feat(prism-agent): expose background jobs recurrence delays as agent config parameters

* feat(prism-agent): expose DB pools awaitConnectionThreads as agent config parameters

* feat(pollux): allow retrieving issue credential records for a specific set of protocol states

* feat(pollux): allow retrieving proof presentation records for a specific set of protocol states

* feat(connect): allow retrieving connection records for a specific set of protocol states

* chore(pollux): align credential service method names

* chore(pollux): fix presentation service method name

* feat(prism-agent): filter connect/issue/presentation records to process in background jobs by relevant states

* fix(prism-agent): fix method call typo

* chore(prism-agent): remove unused zio-http imports

* feat(prism-agent): expose background jobs processing parallelism as app config parameter

* fix(pollux): handle cases where peer's reply is received before initial request is marked as sent

* chore(connect): add protocol_state index

* chore(pollux): add issue credential protocol_state index

* feat(prism-agent): Upgrade connect to 0.8.0

* chore(connect): add 'limit 50' when loading records for bg processing

* chore(pollux): add 'limit 50' when loading issue/presentation records for bg processing

* chore(connect): merge main branch

* updated libs

* updated and fixes

* fix

* feat(prism-agent): bump connect, pollux and mercury versions

* feat(prism-agent): scalafmt

---------

Co-authored-by: FabioPinheiro <[email protected]>
  • Loading branch information
bvoiturier and FabioPinheiro authored Feb 7, 2023
1 parent 09f6a68 commit c77f160
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 59 deletions.
6 changes: 3 additions & 3 deletions prism-agent/service/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ object Dependencies {
val akka = "2.6.20"
val akkaHttp = "10.2.9"
val castor = "0.8.0"
val pollux = "0.23.0"
val connect = "0.9.0"
val pollux = "0.24.0"
val connect = "0.10.0"
val bouncyCastle = "1.70"
val logback = "1.4.5"
val mercury = "0.17.0"
val mercury = "0.18.0"
val zioJson = "0.3.0"
val tapir = "1.2.3"
val flyway = "9.8.3"
Expand Down
10 changes: 10 additions & 0 deletions prism-agent/service/server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ castor {
username = ${?CASTOR_DB_USER}
password = "postgres"
password = ${?CASTOR_DB_PASSWORD}
awaitConnectionThreads = 8
}
}

Expand All @@ -43,7 +44,12 @@ pollux {
username = ${?POLLUX_DB_USER}
password = "postgres"
password = ${?POLLUX_DB_PASSWORD}
awaitConnectionThreads = 8
}
issueBgJobRecurrenceDelay = 2 seconds
issueBgJobProcessingParallelism = 25
presentationBgJobRecurrenceDelay = 2 seconds
presentationBgJobProcessingParallelism = 25
}

connect {
Expand All @@ -58,7 +64,10 @@ connect {
username = ${?CONNECT_DB_USER}
password = "postgres"
password = ${?CONNECT_DB_PASSWORD}
awaitConnectionThreads = 8
}
connectBgJobRecurrenceDelay = 2 seconds
connectBgJobProcessingParallelism = 25
}

agent {
Expand All @@ -80,5 +89,6 @@ agent {
username = ${?AGENT_DB_USER}
password = "postgres"
password = ${?AGENT_DB_PASSWORD}
awaitConnectionThreads = 8
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Main extends ZIOAppDefault {
} yield ()

def appComponents(didCommServicePort: Int, restServicePort: Int) = for {
_ <- Modules.didCommExchangesJob.debug.fork
_ <- Modules.issueCredentialDidCommExchangesJob.debug.fork
_ <- Modules.presentProofExchangeJob.debug.fork
_ <- Modules.connectDidCommExchangesJob.debug.fork
_ <- Modules.didCommServiceEndpoint(didCommServicePort).debug.fork
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,26 +155,38 @@ object Modules {
Server.start(port, app)
}

val didCommExchangesJob: RIO[
DidOps & DIDResolver & JwtDidResolver & HttpClient & CredentialService & ManagedDIDService & DIDSecretStorage,
val issueCredentialDidCommExchangesJob: RIO[
AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & CredentialService & ManagedDIDService &
DIDSecretStorage,
Unit
] =
BackgroundJobs.didCommExchanges
.repeat(Schedule.spaced(10.seconds))
.unit
for {
config <- ZIO.service[AppConfig]
job <- BackgroundJobs.issueCredentialDidCommExchanges
.repeat(Schedule.spaced(config.pollux.issueBgJobRecurrenceDelay))
.unit
} yield job

val presentProofExchangeJob: RIO[
DidOps & DIDResolver & JwtDidResolver & HttpClient & PresentationService & ManagedDIDService & DIDSecretStorage,
AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & PresentationService & ManagedDIDService &
DIDSecretStorage,
Unit
] =
BackgroundJobs.presentProofExchanges
.repeat(Schedule.spaced(10.seconds))
.unit

val connectDidCommExchangesJob: RIO[DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService, Unit] =
ConnectBackgroundJobs.didCommExchanges
.repeat(Schedule.spaced(10.seconds))
.unit
for {
config <- ZIO.service[AppConfig]
job <- BackgroundJobs.presentProofExchanges
.repeat(Schedule.spaced(config.pollux.presentationBgJobRecurrenceDelay))
.unit
} yield job

val connectDidCommExchangesJob
: RIO[AppConfig & DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService, Unit] =
for {
config <- ZIO.service[AppConfig]
job <- ConnectBackgroundJobs.didCommExchanges
.repeat(Schedule.spaced(config.connect.connectBgJobRecurrenceDelay))
.unit
} yield job

val syncDIDPublicationStateFromDltJob: URIO[ManagedDIDService, Unit] =
BackgroundJobs.syncDIDPublicationStateFromDlt
Expand Down Expand Up @@ -523,7 +535,7 @@ object RepoModule {
username = config.username,
password = config.password,
jdbcUrl = s"jdbc:postgresql://${config.host}:${config.port}/${config.databaseName}",
awaitConnectionThreads = 2
awaitConnectionThreads = config.awaitConnectionThreads
)
}
}
Expand All @@ -549,7 +561,7 @@ object RepoModule {
username = config.username,
password = config.password,
jdbcUrl = s"jdbc:postgresql://${config.host}:${config.port}/${config.databaseName}",
awaitConnectionThreads = 2
awaitConnectionThreads = config.awaitConnectionThreads
)
}
}
Expand All @@ -575,7 +587,7 @@ object RepoModule {
username = config.username,
password = config.password,
jdbcUrl = s"jdbc:postgresql://${config.host}:${config.port}/${config.databaseName}",
awaitConnectionThreads = 2
awaitConnectionThreads = config.awaitConnectionThreads
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.iohk.atala.agent.server.config

import zio.config.*
import zio.config.magnolia.Descriptor
import java.time.Duration

final case class AppConfig(
iris: IrisConfig,
Expand All @@ -19,14 +20,31 @@ object AppConfig {
final case class IrisConfig(service: GrpcServiceConfig)

final case class CastorConfig(database: DatabaseConfig)
final case class PolluxConfig(database: DatabaseConfig)
final case class ConnectConfig(database: DatabaseConfig)
final case class PolluxConfig(
database: DatabaseConfig,
issueBgJobRecurrenceDelay: Duration,
issueBgJobProcessingParallelism: Int,
presentationBgJobRecurrenceDelay: Duration,
presentationBgJobProcessingParallelism: Int,
)
final case class ConnectConfig(
database: DatabaseConfig,
connectBgJobRecurrenceDelay: Duration,
connectBgJobProcessingParallelism: Int
)

final case class PrismNodeConfig(service: GrpcServiceConfig)

final case class GrpcServiceConfig(host: String, port: Int)

final case class DatabaseConfig(host: String, port: Int, databaseName: String, username: String, password: String)
final case class DatabaseConfig(
host: String,
port: Int,
databaseName: String,
username: String,
password: String,
awaitConnectionThreads: Int
)

final case class AgentConfig(
httpEndpoint: HttpEndpointConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.iohk.atala.agent.server.http

import zio._
import zio.http._
import zio.http.model._
import zio.http.model.{Header => _, _}
import zio.http.service._
import io.iohk.atala.mercury._

Expand All @@ -12,15 +12,24 @@ object ZioHttpClient {

class ZioHttpClient extends HttpClient {

override def get(url: String): Task[HttpResponseBody] =
override def get(url: String): Task[HttpResponse] =
zio.http.Client
.request(url)
.provideSomeLayer(zio.http.Client.default)
.provideSomeLayer(zio.Scope.default)
.flatMap(_.body.asString)
.map(e => HttpResponseBody(e))
.flatMap { response =>
response.headers.toSeq.map(e => e)
response.body.asString
.map(body =>
HttpResponse(
response.status.code,
response.headers.toSeq.map(h => Header(h.key.toString, h.value.toString)),
body
)
)
}

def postDIDComm(url: String, data: String): Task[HttpResponseBody] =
def postDIDComm(url: String, data: String): Task[HttpResponse] =
zio.http.Client
.request(
url = url, // TODO make ERROR type
Expand All @@ -32,6 +41,15 @@ class ZioHttpClient extends HttpClient {
)
.provideSomeLayer(zio.http.Client.default)
.provideSomeLayer(zio.Scope.default)
.flatMap(_.body.asString)
.map(e => HttpResponseBody(e))
.flatMap { response =>
response.headers.toSeq.map(e => e)
response.body.asString
.map(body =>
HttpResponse(
response.status.code,
response.headers.toSeq.map(h => Header(h.key.toString, h.value.toString)),
body
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import io.iohk.atala.resolvers.DIDResolver
import io.iohk.atala.resolvers.UniversalDidResolver
import java.io.IOException
import io.iohk.atala.pollux.vc.jwt._
import zhttp.service._
import zhttp.http._
import io.iohk.atala.pollux.vc.jwt.W3CCredential
import io.iohk.atala.pollux.core.model.PresentationRecord
import io.iohk.atala.pollux.core.service.PresentationService
Expand Down Expand Up @@ -51,25 +49,41 @@ import io.circe.Json
import io.circe.syntax._
import io.iohk.atala.pollux.vc.jwt.JWT
import io.iohk.atala.pollux.vc.jwt.{DidResolver => JwtDidResolver}
import io.iohk.atala.agent.server.config.AppConfig

object BackgroundJobs {

val didCommExchanges = {
val issueCredentialDidCommExchanges = {
for {
credentialService <- ZIO.service[CredentialService]
config <- ZIO.service[AppConfig]
records <- credentialService
.getIssueCredentialRecords()
.mapError(err => Throwable(s"Error occured while getting issue credential records: $err"))
_ <- ZIO.foreach(records)(performExchange)
.getIssueCredentialRecordsByStates(
IssueCredentialRecord.ProtocolState.OfferPending,
IssueCredentialRecord.ProtocolState.RequestPending,
IssueCredentialRecord.ProtocolState.RequestReceived,
IssueCredentialRecord.ProtocolState.CredentialPending,
IssueCredentialRecord.ProtocolState.CredentialGenerated
)
.mapError(err => Throwable(s"Error occurred while getting Issue Credential records: $err"))
_ <- ZIO.foreachPar(records)(performExchange).withParallelism(config.pollux.issueBgJobProcessingParallelism)
} yield ()
}
val presentProofExchanges = {
for {
presentationService <- ZIO.service[PresentationService]
config <- ZIO.service[AppConfig]
records <- presentationService
.getPresentationRecords()
.mapError(err => Throwable(s"Error occured while getting Presentation records: $err"))
_ <- ZIO.foreach(records)(performPresentation)
.getPresentationRecordsByStates(
PresentationRecord.ProtocolState.RequestPending,
PresentationRecord.ProtocolState.PresentationPending,
PresentationRecord.ProtocolState.PresentationGenerated,
PresentationRecord.ProtocolState.PresentationReceived
)
.mapError(err => Throwable(s"Error occurred while getting Presentation records: $err"))
_ <- ZIO
.foreachPar(records)(performPresentation)
.withParallelism(config.pollux.presentationBgJobProcessingParallelism)
} yield ()
}

Expand All @@ -90,11 +104,14 @@ object BackgroundJobs {
for {
_ <- ZIO.log(s"IssueCredentialRecord: OfferPending (START)")
didCommAgent <- buildDIDCommAgent(offer.from)
_ <- MessagingService
resp <- MessagingService
.send(offer.makeMessage)
.provideSomeLayer(didCommAgent)
credentialService <- ZIO.service[CredentialService]
_ <- credentialService.markOfferSent(id)
_ <- {
if (resp.status >= 200 && resp.status < 300) credentialService.markOfferSent(id)
else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}")
}
} yield ()

// Request should be sent from Holder to Issuer
Expand All @@ -118,11 +135,14 @@ object BackgroundJobs {
) =>
for {
didCommAgent <- buildDIDCommAgent(request.from)
_ <- MessagingService
resp <- MessagingService
.send(request.makeMessage)
.provideSomeLayer(didCommAgent)
credentialService <- ZIO.service[CredentialService]
_ <- credentialService.markRequestSent(id)
_ <- {
if (resp.status >= 200 && resp.status < 300) credentialService.markRequestSent(id)
else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}")
}
} yield ()

// 'automaticIssuance' is TRUE. Issuer automatically accepts the Request
Expand Down Expand Up @@ -212,11 +232,14 @@ object BackgroundJobs {
) =>
for {
didCommAgent <- buildDIDCommAgent(issue.from)
_ <- MessagingService
resp <- MessagingService
.send(issue.makeMessage)
.provideSomeLayer(didCommAgent)
credentialService <- ZIO.service[CredentialService]
_ <- credentialService.markCredentialSent(id)
_ <- {
if (resp.status >= 200 && resp.status < 300) credentialService.markCredentialSent(id)
else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}")
}
} yield ()

// Credential has been generated, published, and can now be sent to the Holder
Expand All @@ -240,9 +263,12 @@ object BackgroundJobs {
) =>
for {
didCommAgent <- buildDIDCommAgent(issue.from)
_ <- MessagingService.send(issue.makeMessage).provideSomeLayer(didCommAgent)
resp <- MessagingService.send(issue.makeMessage).provideSomeLayer(didCommAgent)
credentialService <- ZIO.service[CredentialService]
_ <- credentialService.markCredentialSent(id)
_ <- {
if (resp.status >= 200 && resp.status < 300) credentialService.markCredentialSent(id)
else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}")
}
} yield ()

case IssueCredentialRecord(id, _, _, _, _, _, _, _, _, _, ProblemReportPending, _, _, _, _, _) => ???
Expand Down Expand Up @@ -340,9 +366,12 @@ object BackgroundJobs {
_ <- ZIO.log(s"PresentationRecord: RequestPending (Send Massage)")
didOps <- ZIO.service[DidOps]
didCommAgent <- buildDIDCommAgent(record.from)
_ <- MessagingService.send(record.makeMessage).provideSomeLayer(didCommAgent)
resp <- MessagingService.send(record.makeMessage).provideSomeLayer(didCommAgent)
service <- ZIO.service[PresentationService]
_ <- service.markRequestPresentationSent(id)
_ <- {
if (resp.status >= 200 && resp.status < 300) service.markRequestPresentationSent(id)
else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}")
}
} yield ()

case PresentationRecord(id, _, _, _, _, _, _, _, RequestSent, _, _, _, _) => // Verifier
Expand Down Expand Up @@ -412,11 +441,14 @@ object BackgroundJobs {
for {
_ <- ZIO.log(s"PresentationRecord: PresentationPending (Send Message)")
didCommAgent <- buildDIDCommAgent(p.from)
_ <- MessagingService
resp <- MessagingService
.send(p.makeMessage)
.provideSomeLayer(didCommAgent)
service <- ZIO.service[PresentationService]
_ <- service.markPresentationSent(id)
_ <- {
if (resp.status >= 200 && resp.status < 300) service.markPresentationSent(id)
else ZIO.logWarning(s"DIDComm sending error: [${resp.status}] - ${resp.bodyAsString}")
}
} yield ()
case PresentationRecord(id, _, _, _, _, _, _, _, PresentationSent, _, _, _, _) =>
ZIO.logDebug("PresentationRecord: PresentationSent") *> ZIO.unit
Expand Down
Loading

0 comments on commit c77f160

Please sign in to comment.