Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prism-agent): implement JDBC did nonsecret storage #284

Merged
merged 5 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TYPE public.did_publication_status AS ENUM(
'CREATED',
'PUBLICATION_PENDING',
'PUBLISHED'
);

CREATE TABLE public.did_publication_state(
"did" TEXT NOT NULL PRIMARY KEY,
"publication_status" did_publication_status NOT NULL,
"atala_operation_content" BYTEA NOT NULL,
"publish_operation_id" BYTEA
);
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object Main extends ZIOAppDefault {
_ <- Modules.presentProofExchangeJob.debug.fork
_ <- Modules.connectDidCommExchangesJob.debug.fork
_ <- Modules.didCommServiceEndpoint(didCommServicePort).debug.fork
_ <- Modules.syncDIDPublicationStateFromDltJob.fork
_ <- Modules.app(restServicePort).fork
_ <- Modules.zioApp.fork
_ <- ZIO.never
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import io.iohk.atala.agent.server.http.{HttpRoutes, HttpServer, ZHttp4sBlazeServ
import io.iohk.atala.castor.core.service.{DIDService, DIDServiceImpl}
import io.iohk.atala.castor.core.util.DIDOperationValidator
import io.iohk.atala.agent.server.http.marshaller.{
ConnectionsManagementApiMarshallerImpl,
DIDApiMarshallerImpl,
DIDAuthenticationApiMarshallerImpl,
DIDRegistrarApiMarshallerImpl,
ConnectionsManagementApiMarshallerImpl
DIDRegistrarApiMarshallerImpl
}
import io.iohk.atala.agent.server.http.service.{
ConnectionsManagementApiServiceImpl,
DIDApiServiceImpl,
DIDAuthenticationApiServiceImpl,
DIDRegistrarApiServiceImpl,
ConnectionsManagementApiServiceImpl
DIDRegistrarApiServiceImpl
}
import io.iohk.atala.agent.openapi.api.{DIDApi, DIDAuthenticationApi, DIDRegistrarApi, ConnectionsManagementApi}
import io.iohk.atala.agent.openapi.api.{ConnectionsManagementApi, DIDApi, DIDAuthenticationApi, DIDRegistrarApi}
import cats.effect.std.Dispatcher
import com.typesafe.config.ConfigFactory
import doobie.util.transactor.Transactor
Expand All @@ -39,9 +39,9 @@ import io.iohk.atala.iris.proto.service.IrisServiceGrpc.IrisServiceStub
import io.iohk.atala.pollux.core.repository.CredentialRepository
import io.iohk.atala.pollux.core.service.CredentialService
import io.iohk.atala.pollux.sql.repository.JdbcCredentialRepository
import io.iohk.atala.pollux.sql.repository.{DbConfig => PolluxDbConfig}
import io.iohk.atala.connect.sql.repository.{DbConfig => ConnectDbConfig}
import io.iohk.atala.agent.server.sql.{DbConfig => AgentDbConfig}
import io.iohk.atala.pollux.sql.repository.DbConfig as PolluxDbConfig
import io.iohk.atala.connect.sql.repository.DbConfig as ConnectDbConfig
import io.iohk.atala.agent.server.sql.DbConfig as AgentDbConfig
import io.iohk.atala.agent.server.jobs.*
import zio.*
import zio.config.typesafe.TypesafeConfigSource
Expand Down Expand Up @@ -79,7 +79,7 @@ import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.pollux.schema.{SchemaRegistryServerEndpoints, VerificationPolicyServerEndpoints}
import io.iohk.atala.pollux.service.{SchemaRegistryServiceInMemory, VerificationPolicyServiceInMemory}
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.mercury.protocol.presentproof._
import io.iohk.atala.mercury.protocol.presentproof.*
import io.iohk.atala.agent.server.config.AgentConfig
import org.didcommx.didcomm.DIDComm
import io.iohk.atala.resolvers.UniversalDidResolver
Expand All @@ -88,7 +88,7 @@ import org.didcommx.didcomm.model.UnpackParams
import org.didcommx.didcomm.secret.Secret
import io.circe.ParsingFailure
import io.circe.DecodingFailure
import io.iohk.atala.agent.walletapi.sql.JdbcDIDSecretStorage
import io.iohk.atala.agent.walletapi.sql.{JdbcDIDNonSecretStorage, JdbcDIDSecretStorage}
import io.iohk.atala.resolvers.DIDResolver
import io.iohk.atala.agent.walletapi.storage.DIDSecretStorage
import io.iohk.atala.pollux.vc.jwt.{DidResolver => JwtDidResolver}
Expand Down Expand Up @@ -175,6 +175,12 @@ object Modules {
.repeat(Schedule.spaced(10.seconds))
.unit

val syncDIDPublicationStateFromDltJob: URIO[ManagedDIDService, Unit] =
BackgroundJobs.syncDIDPublicationStateFromDlt
.catchAll(e => ZIO.logError(s"error while syncing DID publication state: $e"))
.repeat(Schedule.spaced(10.seconds))
.unit

private[this] def extractFirstRecipientDid(jsonMessage: String): IO[ParsingFailure | DecodingFailure, String] = {
import io.circe._, io.circe.parser._
val doc = parse(jsonMessage).getOrElse(Json.Null)
Expand Down Expand Up @@ -409,8 +415,11 @@ object AppModule {
val didServiceLayer: TaskLayer[DIDService] =
(didOpValidatorLayer ++ GrpcModule.layers) >>> DIDServiceImpl.layer

val manageDIDServiceLayer: TaskLayer[ManagedDIDService] =
(didOpValidatorLayer ++ didServiceLayer ++ (RepoModule.agentTransactorLayer >>> JdbcDIDSecretStorage.layer)) >>> ManagedDIDService.layer
val manageDIDServiceLayer: TaskLayer[ManagedDIDService] = {
val secretStorageLayer = RepoModule.agentTransactorLayer >>> JdbcDIDSecretStorage.layer
val nonSecretStorageLayer = RepoModule.agentTransactorLayer >>> JdbcDIDNonSecretStorage.layer
(didOpValidatorLayer ++ didServiceLayer ++ secretStorageLayer ++ nonSecretStorageLayer) >>> ManagedDIDService.layer
}

val credentialServiceLayer: RLayer[DidComm, CredentialService] =
(GrpcModule.layers ++ RepoModule.credentialRepoLayer) >>> CredentialServiceImpl.layer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.iohk.atala.agent.server.http.model

import akka.http.scaladsl.server.StandardRoute
import io.iohk.atala.agent.openapi.model.ErrorResponse
import io.iohk.atala.agent.walletapi.model.error.{CreateManagedDIDError, ListManagedDIDError, PublishManagedDIDError}
import io.iohk.atala.agent.walletapi.model.error.{CreateManagedDIDError, GetManagedDIDError, PublishManagedDIDError}
import io.iohk.atala.castor.core.model.did.w3c.DIDResolutionErrorRepr
import io.iohk.atala.castor.core.model.error.DIDOperationError
import io.iohk.atala.castor.core.model.error.DIDResolutionError
Expand Down Expand Up @@ -46,8 +46,8 @@ trait OASErrorModelHelper {
}
}

given ToErrorResponse[ListManagedDIDError] with {
override def toErrorResponse(e: ListManagedDIDError): ErrorResponse = {
given ToErrorResponse[GetManagedDIDError] with {
override def toErrorResponse(e: GetManagedDIDError): ErrorResponse = {
ErrorResponse(
`type` = "error-type",
title = "error-title",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,6 @@ object BackgroundJobs {
ZIO.unit
}

val syncDIDPublicationStateFromDlt = ZIO.serviceWithZIO[ManagedDIDService](_.syncManagedDIDState)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package io.iohk.atala.agent.walletapi.model.error

import io.iohk.atala.castor.core.model.error.DIDOperationError

sealed trait ListManagedDIDError
sealed trait GetManagedDIDError

object ListManagedDIDError {
final case class WalletStorageError(cause: Throwable) extends ListManagedDIDError
final case class OperationError(cause: DIDOperationError) extends ListManagedDIDError
object GetManagedDIDError {
final case class WalletStorageError(cause: Throwable) extends GetManagedDIDError
final case class OperationError(cause: DIDOperationError) extends GetManagedDIDError
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.iohk.atala.agent.walletapi.model

import io.iohk.atala.castor.core.model.did.PrismDID
import io.iohk.atala.castor.core.model.error.DIDOperationError

package object error {
final case class CommonWalletStorageError(cause: Throwable)

given Conversion[CommonWalletStorageError, PublishManagedDIDError] = e =>
PublishManagedDIDError.WalletStorageError(e.cause)
given Conversion[DIDOperationError, PublishManagedDIDError] = PublishManagedDIDError.OperationError(_)

given Conversion[CommonWalletStorageError, GetManagedDIDError] = e => GetManagedDIDError.WalletStorageError(e.cause)
given Conversion[DIDOperationError, GetManagedDIDError] = GetManagedDIDError.OperationError(_)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ import io.iohk.atala.agent.walletapi.model.{
ManagedDIDTemplate
}
import io.iohk.atala.agent.walletapi.model.ECCoordinates.*
import io.iohk.atala.agent.walletapi.model.error.{
CreateManagedDIDError,
ListManagedDIDError,
PublishManagedDIDError,
DIDSecretStorageError
}
import io.iohk.atala.agent.walletapi.model.error.{given, *}
import io.iohk.atala.agent.walletapi.service.ManagedDIDService.{CreateDIDSecret, DEFAULT_MASTER_KEY_ID}
import io.iohk.atala.agent.walletapi.storage.{
DIDNonSecretStorage,
Expand Down Expand Up @@ -66,51 +61,28 @@ final class ManagedDIDService private[walletapi] (
private val AGREEMENT_KEY_ID = "agreement"
private val AUTHENTICATION_KEY_ID = "authentication"

def listManagedDID: IO[ListManagedDIDError, Seq[ManagedDIDDetail]] = nonSecretStorage.listManagedDID
.mapBoth(
ListManagedDIDError.WalletStorageError.apply,
_.toSeq.map { case (did, state) =>
ManagedDIDDetail(did = did.asCanonical, state = state)
}
)
.flatMap { dids =>
ZIO.foreach(dids) { didDetail =>
// state in wallet maybe stale, update it from DLT
syncDIDStateFromDLT(didDetail.state)
.mapError(ListManagedDIDError.OperationError.apply)
.tap(state =>
nonSecretStorage
.setManagedDIDState(didDetail.did, state)
.mapError(ListManagedDIDError.WalletStorageError.apply)
)
.map(didDetail -> _)
}
def syncManagedDIDState: IO[GetManagedDIDError, Unit] = nonSecretStorage.listManagedDID
.mapError(GetManagedDIDError.WalletStorageError.apply)
.flatMap { kv =>
ZIO.foreach(kv.keys.map(_.asCanonical))(computeNewDIDStateFromDLTAndPersist[GetManagedDIDError])
}
.map(_.map { case (didDetail, newState) => didDetail.copy(state = newState) })
.unit

def publishStoredDID(did: CanonicalPrismDID): IO[PublishManagedDIDError, ScheduleDIDOperationOutcome] = {
def syncDLTStateAndPersist =
nonSecretStorage
.getManagedDIDState(did)
.mapError(PublishManagedDIDError.WalletStorageError.apply)
.flatMap(state => ZIO.fromOption(state).mapError(_ => PublishManagedDIDError.DIDNotFound(did)))
.flatMap(state => syncDIDStateFromDLT(state).mapError(PublishManagedDIDError.OperationError.apply))
.tap(state =>
nonSecretStorage.setManagedDIDState(did, state).mapError(PublishManagedDIDError.WalletStorageError.apply)
)
def listManagedDID: IO[GetManagedDIDError, Seq[ManagedDIDDetail]] =
for {
_ <- syncManagedDIDState // state in wallet maybe stale, update it from DLT
dids <- nonSecretStorage.listManagedDID.mapError(GetManagedDIDError.WalletStorageError.apply)
} yield dids.toSeq.map { case (did, state) => ManagedDIDDetail(did.asCanonical, state) }

def publishStoredDID(did: CanonicalPrismDID): IO[PublishManagedDIDError, ScheduleDIDOperationOutcome] = {
def submitOperation(operation: PrismDIDOperation.Create) =
for {
masterKeyPair <-
secretStorage
.getKey(did, DEFAULT_MASTER_KEY_ID)
.mapError(PublishManagedDIDError.WalletStorageError.apply)
.flatMap(maybeKey =>
ZIO
.fromOption(maybeKey)
.orDieWith(_ =>
new Exception("master-key must exists in the wallet for DID publication operation signature")
)
.someOrElseZIO(
ZIO.die(Exception("master-key must exists in the wallet for DID publication operation signature"))
)
signedAtalaOperation <- ZIO
.fromTry(
Expand All @@ -133,7 +105,11 @@ final class ManagedDIDService private[walletapi] (
} yield outcome

for {
didState <- syncDLTStateAndPersist
_ <- computeNewDIDStateFromDLTAndPersist[PublishManagedDIDError](did)
didState <- nonSecretStorage
.getManagedDIDState(did)
.mapError(PublishManagedDIDError.WalletStorageError.apply)
.someOrFail(PublishManagedDIDError.DIDNotFound(did))
outcome <- didState match {
case ManagedDIDState.Created(operation) => submitOperation(operation)
case ManagedDIDState.PublicationPending(operation, operationId) =>
Expand Down Expand Up @@ -217,8 +193,25 @@ final class ManagedDIDService private[walletapi] (
y = Base64UrlString.fromByteArray(keyPair.publicKey.p.y.toPaddedByteArray(CURVE))
)

/** Reconcile state with DLT and return a correct status */
private def syncDIDStateFromDLT(state: ManagedDIDState): IO[DIDOperationError, ManagedDIDState] = {
/** Reconcile state with DLT and write new state to the storage */
private def computeNewDIDStateFromDLTAndPersist[E](
did: CanonicalPrismDID
)(using
c1: Conversion[CommonWalletStorageError, E],
c2: Conversion[DIDOperationError, E]
): IO[E, Unit] = {
nonSecretStorage
.getManagedDIDState(did)
.mapError[E](CommonWalletStorageError.apply)
.flatMap(maybeState => ZIO.foreach(maybeState)(computeNewDIDStateFromDLT(_).mapError[E](e => e)))
.flatMap(maybeState =>
ZIO.foreach(maybeState)(nonSecretStorage.setManagedDIDState(did, _)).mapError[E](CommonWalletStorageError.apply)
)
.unit
}

/** Reconcile state with DLT and return an updated state */
private def computeNewDIDStateFromDLT(state: ManagedDIDState): IO[DIDOperationError, ManagedDIDState] = {
state match {
case s @ ManagedDIDState.PublicationPending(operation, operationId) =>
didService
Expand Down Expand Up @@ -277,7 +270,7 @@ object ManagedDIDService {
ManagedDIDService(_, _, _, _)
)

val layer: URLayer[DIDOperationValidator & DIDService & DIDSecretStorage, ManagedDIDService] =
InMemoryDIDNonSecretStorage.layer >>> ZLayer.fromFunction(ManagedDIDService(_, _, _, _))
val layer: URLayer[DIDOperationValidator & DIDService & DIDSecretStorage & DIDNonSecretStorage, ManagedDIDService] =
ZLayer.fromFunction(ManagedDIDService(_, _, _, _))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.iohk.atala.agent.walletapi.sql

import doobie.*
import doobie.implicits.*
import doobie.postgres.implicits.*

import io.iohk.atala.agent.walletapi.model.ManagedDIDState
import io.iohk.atala.agent.walletapi.storage.DIDNonSecretStorage
import io.iohk.atala.castor.core.model.did.PrismDID
import zio.*
import zio.interop.catz.*

class JdbcDIDNonSecretStorage(xa: Transactor[Task]) extends DIDNonSecretStorage {

override def getManagedDIDState(did: PrismDID): Task[Option[ManagedDIDState]] = {
val cxnIO =
sql"""
| SELECT
| did,
| publication_status,
| atala_operation_content,
| publish_operation_id
| FROM public.did_publication_state
| WHERE did = $did
""".stripMargin
.query[DIDPublicationStateRow]
.option

cxnIO
.transact(xa)
.flatMap(_.map(_.toDomain).fold(ZIO.none)(t => ZIO.fromTry(t).asSome))
}

override def setManagedDIDState(did: PrismDID, state: ManagedDIDState): Task[Unit] = {
val row = DIDPublicationStateRow.from(did, state)
val cxnIO = sql"""
| INSERT INTO public.did_publication_state(
| did,
| publication_status,
| atala_operation_content,
| publish_operation_id
| )
| VALUES (
| ${row.did},
| ${row.publicationStatus},
| ${row.atalaOperationContent},
| ${row.publishOperationId}
| )
| ON CONFLICT (did) DO UPDATE SET
| publication_status = EXCLUDED.publication_status,
| atala_operation_content = EXCLUDED.atala_operation_content,
| publish_operation_id = EXCLUDED.publish_operation_id
""".stripMargin.update

cxnIO.run.transact(xa).unit
}

override def listManagedDID: Task[Map[PrismDID, ManagedDIDState]] = {
val cxnIO =
sql"""
| SELECT
| did,
| publication_status,
| atala_operation_content,
| publish_operation_id
| FROM public.did_publication_state
""".stripMargin
.query[DIDPublicationStateRow]
.to[List]

cxnIO
.transact(xa)
.map(_.map(row => row.toDomain.map(row.did -> _)))
.flatMap(ls => ZIO.foreach(ls)(ZIO.fromTry[(PrismDID, ManagedDIDState)](_)))
.map(_.toMap)
}

}

object JdbcDIDNonSecretStorage {
val layer: URLayer[Transactor[Task], DIDNonSecretStorage] = ZLayer.fromFunction(new JdbcDIDNonSecretStorage(_))
}
Loading