Skip to content

Commit

Permalink
feat(prism-agent): implement JDBC did nonsecret storage (#284)
Browse files Browse the repository at this point in the history
* Implement JdbcDIDNonSecretStorage

* feat(prism-agent): rename did nonsecret storage columns

* Refactor ManagedDIDService syncing DID state

* Add sync DID state backgroundJob

* PR cleanup
  • Loading branch information
patlo-iog authored Dec 22, 2022
1 parent 4385440 commit 7e116a3
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TYPE public.did_publication_status AS ENUM(
'CREATED',
'PUBLICATION_PENDING',
'PUBLISHED'
);

CREATE TABLE public.did_publication_state(
"did" TEXT NOT NULL PRIMARY KEY,
"publication_status" did_publication_status NOT NULL,
"atala_operation_content" BYTEA NOT NULL,
"publish_operation_id" BYTEA
);
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object Main extends ZIOAppDefault {
_ <- Modules.presentProofExchangeJob.debug.fork
_ <- Modules.connectDidCommExchangesJob.debug.fork
_ <- Modules.didCommServiceEndpoint(didCommServicePort).debug.fork
_ <- Modules.syncDIDPublicationStateFromDltJob.fork
_ <- Modules.app(restServicePort).fork
_ <- Modules.zioApp.fork
_ <- ZIO.never
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import io.iohk.atala.agent.server.http.{HttpRoutes, HttpServer, ZHttp4sBlazeServ
import io.iohk.atala.castor.core.service.{DIDService, DIDServiceImpl}
import io.iohk.atala.castor.core.util.DIDOperationValidator
import io.iohk.atala.agent.server.http.marshaller.{
ConnectionsManagementApiMarshallerImpl,
DIDApiMarshallerImpl,
DIDAuthenticationApiMarshallerImpl,
DIDRegistrarApiMarshallerImpl,
ConnectionsManagementApiMarshallerImpl
DIDRegistrarApiMarshallerImpl
}
import io.iohk.atala.agent.server.http.service.{
ConnectionsManagementApiServiceImpl,
DIDApiServiceImpl,
DIDAuthenticationApiServiceImpl,
DIDRegistrarApiServiceImpl,
ConnectionsManagementApiServiceImpl
DIDRegistrarApiServiceImpl
}
import io.iohk.atala.agent.openapi.api.{DIDApi, DIDAuthenticationApi, DIDRegistrarApi, ConnectionsManagementApi}
import io.iohk.atala.agent.openapi.api.{ConnectionsManagementApi, DIDApi, DIDAuthenticationApi, DIDRegistrarApi}
import cats.effect.std.Dispatcher
import com.typesafe.config.ConfigFactory
import doobie.util.transactor.Transactor
Expand All @@ -39,9 +39,9 @@ import io.iohk.atala.iris.proto.service.IrisServiceGrpc.IrisServiceStub
import io.iohk.atala.pollux.core.repository.CredentialRepository
import io.iohk.atala.pollux.core.service.CredentialService
import io.iohk.atala.pollux.sql.repository.JdbcCredentialRepository
import io.iohk.atala.pollux.sql.repository.{DbConfig => PolluxDbConfig}
import io.iohk.atala.connect.sql.repository.{DbConfig => ConnectDbConfig}
import io.iohk.atala.agent.server.sql.{DbConfig => AgentDbConfig}
import io.iohk.atala.pollux.sql.repository.DbConfig as PolluxDbConfig
import io.iohk.atala.connect.sql.repository.DbConfig as ConnectDbConfig
import io.iohk.atala.agent.server.sql.DbConfig as AgentDbConfig
import io.iohk.atala.agent.server.jobs.*
import zio.*
import zio.config.typesafe.TypesafeConfigSource
Expand Down Expand Up @@ -79,7 +79,7 @@ import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.pollux.schema.{SchemaRegistryServerEndpoints, VerificationPolicyServerEndpoints}
import io.iohk.atala.pollux.service.{SchemaRegistryServiceInMemory, VerificationPolicyServiceInMemory}
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.mercury.protocol.presentproof._
import io.iohk.atala.mercury.protocol.presentproof.*
import io.iohk.atala.agent.server.config.AgentConfig
import org.didcommx.didcomm.DIDComm
import io.iohk.atala.resolvers.UniversalDidResolver
Expand All @@ -88,7 +88,7 @@ import org.didcommx.didcomm.model.UnpackParams
import org.didcommx.didcomm.secret.Secret
import io.circe.ParsingFailure
import io.circe.DecodingFailure
import io.iohk.atala.agent.walletapi.sql.JdbcDIDSecretStorage
import io.iohk.atala.agent.walletapi.sql.{JdbcDIDNonSecretStorage, JdbcDIDSecretStorage}
import io.iohk.atala.resolvers.DIDResolver
import io.iohk.atala.agent.walletapi.storage.DIDSecretStorage
import io.iohk.atala.pollux.vc.jwt.{DidResolver => JwtDidResolver}
Expand Down Expand Up @@ -175,6 +175,12 @@ object Modules {
.repeat(Schedule.spaced(10.seconds))
.unit

val syncDIDPublicationStateFromDltJob: URIO[ManagedDIDService, Unit] =
BackgroundJobs.syncDIDPublicationStateFromDlt
.catchAll(e => ZIO.logError(s"error while syncing DID publication state: $e"))
.repeat(Schedule.spaced(10.seconds))
.unit

private[this] def extractFirstRecipientDid(jsonMessage: String): IO[ParsingFailure | DecodingFailure, String] = {
import io.circe._, io.circe.parser._
val doc = parse(jsonMessage).getOrElse(Json.Null)
Expand Down Expand Up @@ -409,8 +415,11 @@ object AppModule {
val didServiceLayer: TaskLayer[DIDService] =
(didOpValidatorLayer ++ GrpcModule.layers) >>> DIDServiceImpl.layer

val manageDIDServiceLayer: TaskLayer[ManagedDIDService] =
(didOpValidatorLayer ++ didServiceLayer ++ (RepoModule.agentTransactorLayer >>> JdbcDIDSecretStorage.layer)) >>> ManagedDIDService.layer
val manageDIDServiceLayer: TaskLayer[ManagedDIDService] = {
val secretStorageLayer = RepoModule.agentTransactorLayer >>> JdbcDIDSecretStorage.layer
val nonSecretStorageLayer = RepoModule.agentTransactorLayer >>> JdbcDIDNonSecretStorage.layer
(didOpValidatorLayer ++ didServiceLayer ++ secretStorageLayer ++ nonSecretStorageLayer) >>> ManagedDIDService.layer
}

val credentialServiceLayer: RLayer[DidComm, CredentialService] =
(GrpcModule.layers ++ RepoModule.credentialRepoLayer) >>> CredentialServiceImpl.layer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.iohk.atala.agent.server.http.model

import akka.http.scaladsl.server.StandardRoute
import io.iohk.atala.agent.openapi.model.ErrorResponse
import io.iohk.atala.agent.walletapi.model.error.{CreateManagedDIDError, ListManagedDIDError, PublishManagedDIDError}
import io.iohk.atala.agent.walletapi.model.error.{CreateManagedDIDError, GetManagedDIDError, PublishManagedDIDError}
import io.iohk.atala.castor.core.model.did.w3c.DIDResolutionErrorRepr
import io.iohk.atala.castor.core.model.error.DIDOperationError
import io.iohk.atala.castor.core.model.error.DIDResolutionError
Expand Down Expand Up @@ -46,8 +46,8 @@ trait OASErrorModelHelper {
}
}

given ToErrorResponse[ListManagedDIDError] with {
override def toErrorResponse(e: ListManagedDIDError): ErrorResponse = {
given ToErrorResponse[GetManagedDIDError] with {
override def toErrorResponse(e: GetManagedDIDError): ErrorResponse = {
ErrorResponse(
`type` = "error-type",
title = "error-title",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,6 @@ object BackgroundJobs {
ZIO.unit
}

val syncDIDPublicationStateFromDlt = ZIO.serviceWithZIO[ManagedDIDService](_.syncManagedDIDState)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package io.iohk.atala.agent.walletapi.model.error

import io.iohk.atala.castor.core.model.error.DIDOperationError

sealed trait ListManagedDIDError
sealed trait GetManagedDIDError

object ListManagedDIDError {
final case class WalletStorageError(cause: Throwable) extends ListManagedDIDError
final case class OperationError(cause: DIDOperationError) extends ListManagedDIDError
object GetManagedDIDError {
final case class WalletStorageError(cause: Throwable) extends GetManagedDIDError
final case class OperationError(cause: DIDOperationError) extends GetManagedDIDError
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.iohk.atala.agent.walletapi.model

import io.iohk.atala.castor.core.model.did.PrismDID
import io.iohk.atala.castor.core.model.error.DIDOperationError

package object error {
final case class CommonWalletStorageError(cause: Throwable)

given Conversion[CommonWalletStorageError, PublishManagedDIDError] = e =>
PublishManagedDIDError.WalletStorageError(e.cause)
given Conversion[DIDOperationError, PublishManagedDIDError] = PublishManagedDIDError.OperationError(_)

given Conversion[CommonWalletStorageError, GetManagedDIDError] = e => GetManagedDIDError.WalletStorageError(e.cause)
given Conversion[DIDOperationError, GetManagedDIDError] = GetManagedDIDError.OperationError(_)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ import io.iohk.atala.agent.walletapi.model.{
ManagedDIDTemplate
}
import io.iohk.atala.agent.walletapi.model.ECCoordinates.*
import io.iohk.atala.agent.walletapi.model.error.{
CreateManagedDIDError,
ListManagedDIDError,
PublishManagedDIDError,
DIDSecretStorageError
}
import io.iohk.atala.agent.walletapi.model.error.{given, *}
import io.iohk.atala.agent.walletapi.service.ManagedDIDService.{CreateDIDSecret, DEFAULT_MASTER_KEY_ID}
import io.iohk.atala.agent.walletapi.storage.{
DIDNonSecretStorage,
Expand Down Expand Up @@ -66,51 +61,28 @@ final class ManagedDIDService private[walletapi] (
private val AGREEMENT_KEY_ID = "agreement"
private val AUTHENTICATION_KEY_ID = "authentication"

def listManagedDID: IO[ListManagedDIDError, Seq[ManagedDIDDetail]] = nonSecretStorage.listManagedDID
.mapBoth(
ListManagedDIDError.WalletStorageError.apply,
_.toSeq.map { case (did, state) =>
ManagedDIDDetail(did = did.asCanonical, state = state)
}
)
.flatMap { dids =>
ZIO.foreach(dids) { didDetail =>
// state in wallet maybe stale, update it from DLT
syncDIDStateFromDLT(didDetail.state)
.mapError(ListManagedDIDError.OperationError.apply)
.tap(state =>
nonSecretStorage
.setManagedDIDState(didDetail.did, state)
.mapError(ListManagedDIDError.WalletStorageError.apply)
)
.map(didDetail -> _)
}
def syncManagedDIDState: IO[GetManagedDIDError, Unit] = nonSecretStorage.listManagedDID
.mapError(GetManagedDIDError.WalletStorageError.apply)
.flatMap { kv =>
ZIO.foreach(kv.keys.map(_.asCanonical))(computeNewDIDStateFromDLTAndPersist[GetManagedDIDError])
}
.map(_.map { case (didDetail, newState) => didDetail.copy(state = newState) })
.unit

def publishStoredDID(did: CanonicalPrismDID): IO[PublishManagedDIDError, ScheduleDIDOperationOutcome] = {
def syncDLTStateAndPersist =
nonSecretStorage
.getManagedDIDState(did)
.mapError(PublishManagedDIDError.WalletStorageError.apply)
.flatMap(state => ZIO.fromOption(state).mapError(_ => PublishManagedDIDError.DIDNotFound(did)))
.flatMap(state => syncDIDStateFromDLT(state).mapError(PublishManagedDIDError.OperationError.apply))
.tap(state =>
nonSecretStorage.setManagedDIDState(did, state).mapError(PublishManagedDIDError.WalletStorageError.apply)
)
def listManagedDID: IO[GetManagedDIDError, Seq[ManagedDIDDetail]] =
for {
_ <- syncManagedDIDState // state in wallet maybe stale, update it from DLT
dids <- nonSecretStorage.listManagedDID.mapError(GetManagedDIDError.WalletStorageError.apply)
} yield dids.toSeq.map { case (did, state) => ManagedDIDDetail(did.asCanonical, state) }

def publishStoredDID(did: CanonicalPrismDID): IO[PublishManagedDIDError, ScheduleDIDOperationOutcome] = {
def submitOperation(operation: PrismDIDOperation.Create) =
for {
masterKeyPair <-
secretStorage
.getKey(did, DEFAULT_MASTER_KEY_ID)
.mapError(PublishManagedDIDError.WalletStorageError.apply)
.flatMap(maybeKey =>
ZIO
.fromOption(maybeKey)
.orDieWith(_ =>
new Exception("master-key must exists in the wallet for DID publication operation signature")
)
.someOrElseZIO(
ZIO.die(Exception("master-key must exists in the wallet for DID publication operation signature"))
)
signedAtalaOperation <- ZIO
.fromTry(
Expand All @@ -133,7 +105,11 @@ final class ManagedDIDService private[walletapi] (
} yield outcome

for {
didState <- syncDLTStateAndPersist
_ <- computeNewDIDStateFromDLTAndPersist[PublishManagedDIDError](did)
didState <- nonSecretStorage
.getManagedDIDState(did)
.mapError(PublishManagedDIDError.WalletStorageError.apply)
.someOrFail(PublishManagedDIDError.DIDNotFound(did))
outcome <- didState match {
case ManagedDIDState.Created(operation) => submitOperation(operation)
case ManagedDIDState.PublicationPending(operation, operationId) =>
Expand Down Expand Up @@ -217,8 +193,25 @@ final class ManagedDIDService private[walletapi] (
y = Base64UrlString.fromByteArray(keyPair.publicKey.p.y.toPaddedByteArray(CURVE))
)

/** Reconcile state with DLT and return a correct status */
private def syncDIDStateFromDLT(state: ManagedDIDState): IO[DIDOperationError, ManagedDIDState] = {
/** Reconcile state with DLT and write new state to the storage */
private def computeNewDIDStateFromDLTAndPersist[E](
did: CanonicalPrismDID
)(using
c1: Conversion[CommonWalletStorageError, E],
c2: Conversion[DIDOperationError, E]
): IO[E, Unit] = {
nonSecretStorage
.getManagedDIDState(did)
.mapError[E](CommonWalletStorageError.apply)
.flatMap(maybeState => ZIO.foreach(maybeState)(computeNewDIDStateFromDLT(_).mapError[E](e => e)))
.flatMap(maybeState =>
ZIO.foreach(maybeState)(nonSecretStorage.setManagedDIDState(did, _)).mapError[E](CommonWalletStorageError.apply)
)
.unit
}

/** Reconcile state with DLT and return an updated state */
private def computeNewDIDStateFromDLT(state: ManagedDIDState): IO[DIDOperationError, ManagedDIDState] = {
state match {
case s @ ManagedDIDState.PublicationPending(operation, operationId) =>
didService
Expand Down Expand Up @@ -277,7 +270,7 @@ object ManagedDIDService {
ManagedDIDService(_, _, _, _)
)

val layer: URLayer[DIDOperationValidator & DIDService & DIDSecretStorage, ManagedDIDService] =
InMemoryDIDNonSecretStorage.layer >>> ZLayer.fromFunction(ManagedDIDService(_, _, _, _))
val layer: URLayer[DIDOperationValidator & DIDService & DIDSecretStorage & DIDNonSecretStorage, ManagedDIDService] =
ZLayer.fromFunction(ManagedDIDService(_, _, _, _))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.iohk.atala.agent.walletapi.sql

import doobie.*
import doobie.implicits.*
import doobie.postgres.implicits.*

import io.iohk.atala.agent.walletapi.model.ManagedDIDState
import io.iohk.atala.agent.walletapi.storage.DIDNonSecretStorage
import io.iohk.atala.castor.core.model.did.PrismDID
import zio.*
import zio.interop.catz.*

class JdbcDIDNonSecretStorage(xa: Transactor[Task]) extends DIDNonSecretStorage {

override def getManagedDIDState(did: PrismDID): Task[Option[ManagedDIDState]] = {
val cxnIO =
sql"""
| SELECT
| did,
| publication_status,
| atala_operation_content,
| publish_operation_id
| FROM public.did_publication_state
| WHERE did = $did
""".stripMargin
.query[DIDPublicationStateRow]
.option

cxnIO
.transact(xa)
.flatMap(_.map(_.toDomain).fold(ZIO.none)(t => ZIO.fromTry(t).asSome))
}

override def setManagedDIDState(did: PrismDID, state: ManagedDIDState): Task[Unit] = {
val row = DIDPublicationStateRow.from(did, state)
val cxnIO = sql"""
| INSERT INTO public.did_publication_state(
| did,
| publication_status,
| atala_operation_content,
| publish_operation_id
| )
| VALUES (
| ${row.did},
| ${row.publicationStatus},
| ${row.atalaOperationContent},
| ${row.publishOperationId}
| )
| ON CONFLICT (did) DO UPDATE SET
| publication_status = EXCLUDED.publication_status,
| atala_operation_content = EXCLUDED.atala_operation_content,
| publish_operation_id = EXCLUDED.publish_operation_id
""".stripMargin.update

cxnIO.run.transact(xa).unit
}

override def listManagedDID: Task[Map[PrismDID, ManagedDIDState]] = {
val cxnIO =
sql"""
| SELECT
| did,
| publication_status,
| atala_operation_content,
| publish_operation_id
| FROM public.did_publication_state
""".stripMargin
.query[DIDPublicationStateRow]
.to[List]

cxnIO
.transact(xa)
.map(_.map(row => row.toDomain.map(row.did -> _)))
.flatMap(ls => ZIO.foreach(ls)(ZIO.fromTry[(PrismDID, ManagedDIDState)](_)))
.map(_.toMap)
}

}

object JdbcDIDNonSecretStorage {
val layer: URLayer[Transactor[Task], DIDNonSecretStorage] = ZLayer.fromFunction(new JdbcDIDNonSecretStorage(_))
}
Loading

0 comments on commit 7e116a3

Please sign in to comment.