Skip to content

Commit

Permalink
feat(prism-agent): fix prism-agent major issues (#199)
Browse files Browse the repository at this point in the history
* chore(prism-agent): bump pollux & connect to latest versions

* chore(prism-agent): integrate renamed CredentialServiceError & ConnectionServiceError traits

* chore(prism-agent): handle case where DIDComm sender receives a non-2xx response from peer

* chore(prism-agent): make sure issue & connect background jobs do not stop running after record processing errors
  • Loading branch information
bvoiturier authored Dec 2, 2022
1 parent 8a00111 commit 1dc7339
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 79 deletions.
6 changes: 3 additions & 3 deletions prism-agent/service/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ object Dependencies {
val akka = "2.6.20"
val akkaHttp = "10.2.9"
val castor = "0.4.0"
val pollux = "0.5.0"
val connect = "0.2.0"
val pollux = "0.6.0"
val connect = "0.3.0"
val bouncyCastle = "1.70"
val logback = "1.4.4"
val mercury = "0.7.0"
val mercury = "0.8.0"
val zioJson = "0.3.0"
val tapir = "1.2.2"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ import io.iohk.atala.mercury.*
import io.iohk.atala.mercury.model.*
import io.iohk.atala.mercury.model.error.*
import io.iohk.atala.mercury.protocol.issuecredential.*
import io.iohk.atala.pollux.core.model.error.IssueCredentialError
import io.iohk.atala.pollux.core.model.error.IssueCredentialError.RepositoryError
import io.iohk.atala.pollux.core.model.error.CredentialServiceError.RepositoryError
import io.iohk.atala.prism.protos.node_api.NodeServiceGrpc

import java.io.IOException
Expand All @@ -76,9 +75,10 @@ import io.iohk.atala.connect.core.repository.ConnectionRepository
import io.iohk.atala.connect.sql.repository.JdbcConnectionRepository
import io.iohk.atala.mercury.protocol.connection.ConnectionRequest
import io.iohk.atala.mercury.protocol.connection.ConnectionResponse
import io.iohk.atala.connect.core.model.error.ConnectionError
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.pollux.schema.{SchemaRegistryServerEndpoints, VerificationPolicyServerEndpoints}
import io.iohk.atala.pollux.service.{SchemaRegistryServiceInMemory, VerificationPolicyServiceInMemory}
import io.iohk.atala.connect.core.model.error.ConnectionServiceError

object Modules {

Expand Down Expand Up @@ -235,15 +235,15 @@ object Modules {
// Receive and store ConnectionRequest
maybeRecord <- connectionService
.receiveConnectionRequest(connectionRequest)
.catchSome { case ConnectionError.RepositoryError(cause) =>
.catchSome { case ConnectionServiceError.RepositoryError(cause) =>
ZIO.logError(cause.getMessage()) *>
ZIO.fail(cause)
}
.catchAll { case ex: IOException => ZIO.fail(ex) }
// Accept the ConnectionRequest
_ <- connectionService
.acceptConnectionRequest(maybeRecord.get.id) // TODO: get
.catchSome { case ConnectionError.RepositoryError(cause) =>
.catchSome { case ConnectionServiceError.RepositoryError(cause) =>
ZIO.logError(cause.getMessage()) *>
ZIO.fail(cause)
}
Expand All @@ -259,7 +259,7 @@ object Modules {
_ <- ZIO.logInfo("Got ConnectionResponse: " + connectionResponse)
_ <- connectionService
.receiveConnectionResponse(connectionResponse)
.catchSome { case ConnectionError.RepositoryError(cause) =>
.catchSome { case ConnectionServiceError.RepositoryError(cause) =>
ZIO.logError(cause.getMessage()) *>
ZIO.fail(cause)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package io.iohk.atala.agent.server.http.model

import akka.http.scaladsl.server.StandardRoute
import io.iohk.atala.agent.openapi.model.ErrorResponse
import io.iohk.atala.agent.walletapi.model.error.{CreateManagedDIDError, PublishManagedDIDError}
import io.iohk.atala.agent.walletapi.model.error.CreateManagedDIDError
import io.iohk.atala.agent.walletapi.model.error.PublishManagedDIDError
import io.iohk.atala.castor.core.model.did.w3c.DIDResolutionErrorRepr
import io.iohk.atala.castor.core.model.error.{DIDOperationError, DIDResolutionError}
import io.iohk.atala.castor.core.model.error.DIDOperationError
import io.iohk.atala.castor.core.model.error.DIDResolutionError
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.pollux.core.model.error.CredentialServiceError

import java.util.UUID
import io.iohk.atala.pollux.core.model.error.IssueCredentialError
import io.iohk.atala.connect.core.model.error.ConnectionError

trait ToErrorResponse[E] {
def toErrorResponse(e: E): ErrorResponse
Expand Down Expand Up @@ -91,8 +93,8 @@ trait OASErrorModelHelper {
}
}

given ToErrorResponse[IssueCredentialError] with {
def toErrorResponse(error: IssueCredentialError): ErrorResponse = {
given ToErrorResponse[CredentialServiceError] with {
def toErrorResponse(error: CredentialServiceError): ErrorResponse = {
ErrorResponse(
`type` = "error-type",
title = "error-title",
Expand All @@ -103,8 +105,8 @@ trait OASErrorModelHelper {
}
}

given ToErrorResponse[ConnectionError] with {
def toErrorResponse(error: ConnectionError): ErrorResponse = {
given ToErrorResponse[ConnectionServiceError] with {
def toErrorResponse(error: ConnectionServiceError): ErrorResponse = {
ErrorResponse(
`type` = "error-type",
title = "error-title",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package io.iohk.atala.agent.server.http.service

import io.iohk.atala.agent.openapi.api._
import io.iohk.atala.agent.openapi.model._
import akka.http.scaladsl.server.Directives.*
import akka.http.scaladsl.marshalling.ToEntityMarshaller
import akka.http.scaladsl.server.Directives.*
import akka.http.scaladsl.server.Route
import zio._
import io.iohk.atala.connect.core.service.ConnectionService
import io.iohk.atala.agent.openapi.api._
import io.iohk.atala.agent.openapi.model._
import io.iohk.atala.agent.server.http.model.HttpServiceError
import io.iohk.atala.agent.server.http.model.OASDomainModelHelper
import io.iohk.atala.agent.server.http.model.OASErrorModelHelper
import io.iohk.atala.agent.server.http.model.HttpServiceError
import io.iohk.atala.connect.core.model.error.ConnectionError
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.connect.core.service.ConnectionService
import io.iohk.atala.mercury.PeerDID
import io.iohk.atala.mercury.protocol.invitation.v2.Invitation
import zio._

class ConnectionsManagementApiServiceImpl(connectionService: ConnectionService)(using runtime: zio.Runtime[Any])
extends ConnectionsManagementApiService,
Expand All @@ -27,7 +27,7 @@ class ConnectionsManagementApiServiceImpl(connectionService: ConnectionService)(
val result = for {
record <- connectionService
.createConnectionInvitation(request.label)
.mapError(HttpServiceError.DomainError[ConnectionError].apply)
.mapError(HttpServiceError.DomainError[ConnectionServiceError].apply)
} yield record

onZioSuccess(result.mapBoth(_.toOAS, _.toOAS).either) {
Expand All @@ -43,7 +43,7 @@ class ConnectionsManagementApiServiceImpl(connectionService: ConnectionService)(
val result = for {
outcome <- connectionService
.getConnectionRecords()
.mapError(HttpServiceError.DomainError[ConnectionError].apply)
.mapError(HttpServiceError.DomainError[ConnectionServiceError].apply)
} yield outcome

onZioSuccess(result.mapBoth(_.toOAS, _.map(_.toOAS)).either) {
Expand All @@ -67,7 +67,7 @@ class ConnectionsManagementApiServiceImpl(connectionService: ConnectionService)(
recordId <- connectionId.toUUID
outcome <- connectionService
.getConnectionRecord(recordId)
.mapError(HttpServiceError.DomainError[ConnectionError].apply)
.mapError(HttpServiceError.DomainError[ConnectionServiceError].apply)
} yield outcome

onZioSuccess(result.mapBoth(_.toOAS, _.map(_.toOAS)).either) {
Expand All @@ -84,10 +84,10 @@ class ConnectionsManagementApiServiceImpl(connectionService: ConnectionService)(
val result = for {
record <- connectionService
.receiveConnectionInvitation(request.invitation)
.mapError(HttpServiceError.DomainError[ConnectionError].apply)
.mapError(HttpServiceError.DomainError[ConnectionServiceError].apply)
record <- connectionService
.acceptConnectionInvitation(record.id)
.mapError(HttpServiceError.DomainError[ConnectionError].apply)
.mapError(HttpServiceError.DomainError[ConnectionServiceError].apply)
} yield record

onZioSuccess(result.mapBoth(_.toOAS, _.map(_.toOAS)).either) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package io.iohk.atala.agent.server.http.service

import akka.http.scaladsl.server.Directives.*
import akka.http.scaladsl.marshalling.ToEntityMarshaller
import akka.http.scaladsl.server.Directives.*
import akka.http.scaladsl.server.Route
import io.iohk.atala.agent.openapi.api.IssueCredentialsProtocolApiService
import zio.*
import io.iohk.atala.pollux.core.service.CredentialService
import io.iohk.atala.agent.openapi.model.*
import java.util.UUID
import io.iohk.atala.agent.server.http.model.HttpServiceError
import io.iohk.atala.pollux.core.model.error.IssueCredentialError
import io.iohk.atala.agent.server.http.model.{HttpServiceError, OASDomainModelHelper, OASErrorModelHelper}
import scala.util.Try
import io.iohk.atala.agent.server.http.model.HttpServiceError.InvalidPayload
import io.iohk.atala.agent.server.http.model.OASDomainModelHelper
import io.iohk.atala.agent.server.http.model.OASErrorModelHelper
import io.iohk.atala.pollux.core.model.error.CredentialServiceError
import io.iohk.atala.pollux.core.service.CredentialService
import zio.*

import java.util.UUID
import scala.util.Try

class IssueCredentialsProtocolApiServiceImpl(credentialService: CredentialService)(using runtime: zio.Runtime[Any])
extends IssueCredentialsProtocolApiService,
Expand All @@ -35,7 +37,7 @@ class IssueCredentialsProtocolApiServiceImpl(credentialService: CredentialServic
request.automaticIssuance.orElse(Some(true)),
request.awaitConfirmation.orElse(Some(false))
)
.mapError(HttpServiceError.DomainError[IssueCredentialError].apply)
.mapError(HttpServiceError.DomainError[CredentialServiceError].apply)
} yield outcome

onZioSuccess(result.mapBoth(_.toOAS, _.toOAS).either) {
Expand All @@ -51,7 +53,7 @@ class IssueCredentialsProtocolApiServiceImpl(credentialService: CredentialServic
val result = for {
outcome <- credentialService
.getIssueCredentialRecords()
.mapError(HttpServiceError.DomainError[IssueCredentialError].apply)
.mapError(HttpServiceError.DomainError[CredentialServiceError].apply)
} yield outcome

onZioSuccess(result.mapBoth(_.toOAS, _.map(_.toOAS)).either) {
Expand All @@ -76,7 +78,7 @@ class IssueCredentialsProtocolApiServiceImpl(credentialService: CredentialServic
uuid <- recordId.toUUID
outcome <- credentialService
.getIssueCredentialRecord(uuid)
.mapError(HttpServiceError.DomainError[IssueCredentialError].apply)
.mapError(HttpServiceError.DomainError[CredentialServiceError].apply)
} yield outcome

onZioSuccess(result.mapBoth(_.toOAS, _.map(_.toOAS)).either) {
Expand All @@ -94,7 +96,7 @@ class IssueCredentialsProtocolApiServiceImpl(credentialService: CredentialServic
uuid <- recordId.toUUID
outcome <- credentialService
.acceptCredentialOffer(uuid)
.mapError(HttpServiceError.DomainError[IssueCredentialError].apply)
.mapError(HttpServiceError.DomainError[CredentialServiceError].apply)
} yield outcome

onZioSuccess(result.mapBoth(_.toOAS, _.map(_.toOAS)).either) {
Expand All @@ -112,7 +114,7 @@ class IssueCredentialsProtocolApiServiceImpl(credentialService: CredentialServic
uuid <- recordId.toUUID
outcome <- credentialService
.acceptCredentialRequest(uuid)
.mapError(HttpServiceError.DomainError[IssueCredentialError].apply)
.mapError(HttpServiceError.DomainError[CredentialServiceError].apply)
} yield outcome

onZioSuccess(result.mapBoth(_.toOAS, _.map(_.toOAS)).either) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ import scala.jdk.CollectionConverters.*
import zio.*
import io.iohk.atala.pollux.core.service.CredentialService
import io.iohk.atala.pollux.core.model.IssueCredentialRecord
import io.iohk.atala.pollux.core.model.error.CreateCredentialPayloadFromRecordError
import io.iohk.atala.pollux.core.model.error.IssueCredentialError
import io.iohk.atala.pollux.core.model.error.MarkCredentialRecordsAsPublishQueuedError
import io.iohk.atala.pollux.core.model.error.PublishCredentialBatchError
import io.iohk.atala.pollux.core.model.error.CredentialServiceError
import io.iohk.atala.pollux.core.service.CredentialService
import io.iohk.atala.pollux.vc.jwt.W3cCredentialPayload
import zio.*
Expand Down Expand Up @@ -41,7 +38,7 @@ object BackgroundJobs {

private[this] def performExchange(
record: IssueCredentialRecord
): ZIO[DidComm & CredentialService, Throwable, Unit] = {
): URIO[DidComm & CredentialService, Unit] = {
import IssueCredentialRecord._
import IssueCredentialRecord.ProtocolState._
import IssueCredentialRecord.PublicationState._
Expand All @@ -50,21 +47,21 @@ object BackgroundJobs {
_ <- record match {
// Offer should be sent from Issuer to Holder
case IssueCredentialRecord(id, _, _, _, _, Role.Issuer, _, _, _, _, OfferPending, _, Some(offer), _, _) =>
for {
(for {
_ <- ZIO.log(s"IssueCredentialRecord: OfferPending (START)")
didComm <- ZIO.service[DidComm]
_ <- sendMessage(offer.makeMessage)
credentialService <- ZIO.service[CredentialService]
_ <- credentialService.markOfferSent(id)
} yield ()
} yield ()): ZIO[DidComm & CredentialService, CredentialServiceError | MercuryException, Unit]

// Request should be sent from Holder to Issuer
case IssueCredentialRecord(id, _, _, _, _, Role.Holder, _, _, _, _, RequestPending, _, _, Some(request), _) =>
for {
(for {
_ <- sendMessage(request.makeMessage)
credentialService <- ZIO.service[CredentialService]
_ <- credentialService.markRequestSent(id)
} yield ()
} yield ()): ZIO[DidComm & CredentialService, CredentialServiceError | MercuryException, Unit]

// 'automaticIssuance' is TRUE. Issuer automatically accepts the Request
case IssueCredentialRecord(id, _, _, _, _, Role.Issuer, _, _, Some(true), _, RequestReceived, _, _, _, _) =>
Expand Down Expand Up @@ -130,11 +127,11 @@ object BackgroundJobs {
_,
Some(issue)
) =>
for {
(for {
_ <- sendMessage(issue.makeMessage)
credentialService <- ZIO.service[CredentialService]
_ <- credentialService.markCredentialSent(id)
} yield ()
} yield ()): ZIO[DidComm & CredentialService, CredentialServiceError | MercuryException, Unit]

// Credential has been generated, published, and can now be sent to the Holder
case IssueCredentialRecord(
Expand All @@ -154,24 +151,27 @@ object BackgroundJobs {
_,
Some(issue)
) =>
for {
(for {
_ <- sendMessage(issue.makeMessage)
credentialService <- ZIO.service[CredentialService]
_ <- credentialService.markCredentialSent(id)
} yield ()
} yield ()): ZIO[DidComm & CredentialService, CredentialServiceError | MercuryException, Unit]

case IssueCredentialRecord(id, _, _, _, _, _, _, _, _, _, ProblemReportPending, _, _, _, _) => ???
case IssueCredentialRecord(id, _, _, _, _, _, _, _, _, _, _, _, _, _, _) => ZIO.unit
}
} yield ()

aux.catchAll {
case ex: TransportError => // : io.iohk.atala.mercury.model.error.MercuryError | java.io.IOException =>
ex.printStackTrace()
ZIO.logError(ex.getMessage()) *>
ZIO.fail(mercuryErrorAsThrowable(ex))
case ex: IOException => ZIO.fail(ex)
}
aux
.catchAll {
case ex: MercuryException =>
ZIO.logErrorCause(s"DIDComm communication error processing record: ${record.id}", Cause.fail(ex))
case ex: CredentialServiceError =>
ZIO.logErrorCause(s"Credential service error processing record: ${record.id} ", Cause.fail(ex))
}
.catchAllDefect { case throwable =>
ZIO.logErrorCause(s"Issue Credential protocol defect processing record: ${record.id}", Cause.fail(throwable))
}
}

val publishCredentialsToDlt = {
Expand All @@ -183,10 +183,7 @@ object BackgroundJobs {
}

private[this] def performPublishCredentialsToDlt(credentialService: CredentialService) = {
type PublishToDltError = IssueCredentialError | CreateCredentialPayloadFromRecordError |
PublishCredentialBatchError | MarkCredentialRecordsAsPublishQueuedError

val res: ZIO[Any, PublishToDltError, Unit] = for {
val res: ZIO[Any, CredentialServiceError, Unit] = for {
records <- credentialService.getCredentialRecordsByState(IssueCredentialRecord.ProtocolState.CredentialPending)
// NOTE: the line below is a potentially slow operation, because <createCredentialPayloadFromRecord> makes a database SELECT call,
// so calling this function n times will make n database SELECT calls, while it can be optimized to get
Expand Down
Loading

0 comments on commit 1dc7339

Please sign in to comment.