diff --git a/build.sbt b/build.sbt index 5813081494..1591355ae9 100644 --- a/build.sbt +++ b/build.sbt @@ -150,7 +150,7 @@ lazy val D_Connect = new { // Dependency Modules private lazy val baseDependencies: Seq[ModuleID] = - Seq(D.zio, D.zioTest, D.zioTestSbt, D.zioTestMagnolia, D.testcontainersPostgres, logback) + Seq(D.zio, D.zioTest, D.zioTestSbt, D.zioTestMagnolia, D.zioMock, D.testcontainersPostgres, logback) // Project Dependencies lazy val coreDependencies: Seq[ModuleID] = @@ -223,6 +223,7 @@ lazy val D_Pollux = new { D.zioTest, D.zioTestSbt, D.zioTestMagnolia, + D.zioMock, D.munit, D.munitZio, prismCrypto, @@ -280,6 +281,17 @@ lazy val D_Pollux_VC_JWT = new { lazy val polluxVcJwtDependencies: Seq[ModuleID] = baseDependencies } +lazy val D_EventNotification = new { + val zio = "dev.zio" %% "zio" % V.zio + val zioConcurrent = "dev.zio" %% "zio-concurrent" % V.zio + val zioTest = "dev.zio" %% "zio-test" % V.zio % Test + val zioTestSbt = "dev.zio" %% "zio-test-sbt" % V.zio % Test + val zioTestMagnolia = "dev.zio" %% "zio-test-magnolia" % V.zio % Test + + val zioDependencies: Seq[ModuleID] = Seq(zio, zioConcurrent, zioTest, zioTestSbt, zioTestMagnolia) + val baseDependencies: Seq[ModuleID] = zioDependencies +} + lazy val D_PrismAgent = new { // Added here to make prism-crypto works. @@ -632,7 +644,7 @@ lazy val polluxCore = project ) .dependsOn(shared) .dependsOn(polluxVcJWT) - .dependsOn(protocolIssueCredential, protocolPresentProof, resolver, agentDidcommx) + .dependsOn(protocolIssueCredential, protocolPresentProof, resolver, agentDidcommx, eventNotification) lazy val polluxDoobie = project .in(file("pollux/lib/sql-doobie")) @@ -687,7 +699,7 @@ lazy val connectCore = project Test / publishArtifact := true ) .dependsOn(shared) - .dependsOn(protocolConnection, protocolReportProblem) + .dependsOn(protocolConnection, protocolReportProblem, eventNotification) lazy val connectDoobie = project .in(file("connect/lib/sql-doobie")) @@ -699,6 +711,17 @@ lazy val connectDoobie = project .dependsOn(shared) .dependsOn(connectCore % "compile->compile;test->test") +// ############################ +// #### Event Notification #### +// ############################ + +lazy val eventNotification = project + .in(file("event-notification")) + .settings( + name := "event-notification", + libraryDependencies ++= D_EventNotification.baseDependencies + ) + // ##################### // #### Prism Agent #### // ##################### @@ -711,8 +734,11 @@ lazy val prismAgentWalletAPI = project name := "prism-agent-wallet-api", libraryDependencies ++= D_PrismAgent.keyManagementDependencies ) - .dependsOn(agentDidcommx) - .dependsOn(castorCore) + .dependsOn( + agentDidcommx, + castorCore, + eventNotification + ) lazy val prismAgentServer = project .in(file("prism-agent/service/server")) @@ -741,7 +767,8 @@ lazy val prismAgentServer = project polluxAnoncreds, connectCore, connectDoobie, - castorCore + castorCore, + eventNotification ) // ################## @@ -822,6 +849,7 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq( prismAgentWalletAPI, prismAgentServer, mediator, + eventNotification, ) lazy val root = project diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala new file mode 100644 index 0000000000..d6676c7e5f --- /dev/null +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala @@ -0,0 +1,89 @@ +package io.iohk.atala.connect.core.service + +import io.iohk.atala.connect.core.model.ConnectionRecord +import io.iohk.atala.connect.core.model.error.ConnectionServiceError +import io.iohk.atala.event.notification.{Event, EventNotificationService} +import io.iohk.atala.mercury.model.DidId +import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse} +import zio.{IO, URLayer, ZIO, ZLayer} + +import java.util.UUID + +class ConnectionServiceNotifier( + svc: ConnectionService, + eventNotificationService: EventNotificationService +) extends ConnectionService { + + private val connectionUpdatedEvent = "ConnectionUpdated" + + override def createConnectionInvitation( + label: Option[String], + pairwiseDID: DidId + ): IO[ConnectionServiceError, ConnectionRecord] = + notifyOnSuccess(svc.createConnectionInvitation(label, pairwiseDID)) + + override def receiveConnectionInvitation(invitation: String): IO[ConnectionServiceError, ConnectionRecord] = + notifyOnSuccess(svc.receiveConnectionInvitation(invitation)) + + override def acceptConnectionInvitation( + recordId: UUID, + pairwiseDid: DidId + ): IO[ConnectionServiceError, ConnectionRecord] = + notifyOnSuccess(svc.acceptConnectionInvitation(recordId, pairwiseDid)) + + override def markConnectionRequestSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = + notifyOnSuccess(svc.markConnectionRequestSent(recordId)) + + override def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] = + notifyOnSuccess(svc.receiveConnectionRequest(request)) + + override def acceptConnectionRequest(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = + notifyOnSuccess(svc.acceptConnectionRequest(recordId)) + + override def markConnectionResponseSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = + notifyOnSuccess(svc.markConnectionResponseSent(recordId)) + + override def receiveConnectionResponse(response: ConnectionResponse): IO[ConnectionServiceError, ConnectionRecord] = + notifyOnSuccess(svc.receiveConnectionResponse(response)) + + private[this] def notifyOnSuccess(effect: IO[ConnectionServiceError, ConnectionRecord]) = + for { + record <- effect + _ <- notify(record) + } yield record + + private[this] def notify(record: ConnectionRecord) = { + val result = for { + producer <- eventNotificationService.producer[ConnectionRecord]("Connect") + _ <- producer.send(Event(connectionUpdatedEvent, record)) + } yield () + result.catchAll(e => ZIO.logError(s"Notification service error: $e")) + } + + override def getConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Option[ConnectionRecord]] = + svc.getConnectionRecord(recordId) + + override def getConnectionRecordByThreadId(thid: String): IO[ConnectionServiceError, Option[ConnectionRecord]] = + svc.getConnectionRecordByThreadId(thid) + + override def deleteConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Int] = + svc.deleteConnectionRecord(recordId) + + override def reportProcessingFailure(recordId: UUID, failReason: Option[String]): IO[ConnectionServiceError, Unit] = + svc.reportProcessingFailure(recordId, failReason) + + override def getConnectionRecords(): IO[ConnectionServiceError, Seq[ConnectionRecord]] = + svc.getConnectionRecords() + + override def getConnectionRecordsByStates( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ConnectionRecord.ProtocolState* + ): IO[ConnectionServiceError, Seq[ConnectionRecord]] = + svc.getConnectionRecordsByStates(ignoreWithZeroRetries, limit, states: _*) +} + +object ConnectionServiceNotifier { + val layer: URLayer[ConnectionService & EventNotificationService, ConnectionService] = + ZLayer.fromFunction(ConnectionServiceNotifier(_, _)) +} diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala new file mode 100644 index 0000000000..d5e3f005ef --- /dev/null +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala @@ -0,0 +1,80 @@ +package io.iohk.atala.connect.core.service + +import io.iohk.atala.connect.core.model.ConnectionRecord +import io.iohk.atala.connect.core.model.error.ConnectionServiceError +import io.iohk.atala.mercury.model.DidId +import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse} +import zio.mock.{Mock, Proxy} +import zio.{IO, URLayer, ZIO, ZLayer, mock} + +import java.util.UUID + +object MockConnectionService extends Mock[ConnectionService] { + + object CreateConnectionInvitation extends Effect[(Option[String], DidId), ConnectionServiceError, ConnectionRecord] + object ReceiveConnectionInvitation extends Effect[String, ConnectionServiceError, ConnectionRecord] + object AcceptConnectionInvitation extends Effect[(UUID, DidId), ConnectionServiceError, ConnectionRecord] + object MarkConnectionRequestSent extends Effect[UUID, ConnectionServiceError, ConnectionRecord] + object ReceiveConnectionRequest extends Effect[ConnectionRequest, ConnectionServiceError, ConnectionRecord] + object AcceptConnectionRequest extends Effect[UUID, ConnectionServiceError, ConnectionRecord] + object MarkConnectionResponseSent extends Effect[UUID, ConnectionServiceError, ConnectionRecord] + object ReceiveConnectionResponse extends Effect[ConnectionResponse, ConnectionServiceError, ConnectionRecord] + + override val compose: URLayer[mock.Proxy, ConnectionService] = ZLayer { + for { + proxy <- ZIO.service[Proxy] + } yield new ConnectionService { + override def createConnectionInvitation( + label: Option[String], + pairwiseDID: DidId + ): IO[ConnectionServiceError, ConnectionRecord] = + proxy(CreateConnectionInvitation, label, pairwiseDID) + + override def receiveConnectionInvitation(invitation: String): IO[ConnectionServiceError, ConnectionRecord] = + proxy(ReceiveConnectionInvitation, invitation) + + override def acceptConnectionInvitation( + recordId: UUID, + pairwiseDid: DidId + ): IO[ConnectionServiceError, ConnectionRecord] = + proxy(AcceptConnectionInvitation, recordId, pairwiseDid) + + override def markConnectionRequestSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = + proxy(MarkConnectionRequestSent, recordId) + + override def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] = + proxy(ReceiveConnectionRequest, request) + + override def acceptConnectionRequest(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = + proxy(AcceptConnectionRequest, recordId) + + override def markConnectionResponseSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = + proxy(MarkConnectionResponseSent, recordId) + + override def receiveConnectionResponse( + response: ConnectionResponse + ): IO[ConnectionServiceError, ConnectionRecord] = + proxy(ReceiveConnectionResponse, response) + + override def getConnectionRecords(): IO[ConnectionServiceError, Seq[ConnectionRecord]] = ??? + + override def getConnectionRecordsByStates( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ConnectionRecord.ProtocolState* + ): IO[ConnectionServiceError, Seq[ConnectionRecord]] = ??? + + override def getConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Option[ConnectionRecord]] = ??? + + override def getConnectionRecordByThreadId(thid: String): IO[ConnectionServiceError, Option[ConnectionRecord]] = + ??? + + override def deleteConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Int] = ??? + + override def reportProcessingFailure( + recordId: UUID, + failReason: Option[String] + ): IO[ConnectionServiceError, Unit] = ??? + } + } +} diff --git a/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifierSpec.scala b/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifierSpec.scala new file mode 100644 index 0000000000..955e23969e --- /dev/null +++ b/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifierSpec.scala @@ -0,0 +1,139 @@ +package io.iohk.atala.connect.core.service + +import io.iohk.atala.connect.core.model.ConnectionRecord +import io.iohk.atala.connect.core.model.ConnectionRecord.ProtocolState +import io.iohk.atala.connect.core.repository.ConnectionRepositoryInMemory +import io.iohk.atala.event.notification.* +import io.iohk.atala.mercury.model.DidId +import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse} +import io.iohk.atala.mercury.protocol.invitation.v2.Invitation +import zio.* +import zio.ZIO.* +import zio.mock.Expectation +import zio.test.* + +import java.time.Instant +import java.util.UUID + +object ConnectionServiceNotifierSpec extends ZIOSpecDefault { + + private val record = ConnectionRecord( + UUID.randomUUID(), + Instant.now, + None, + UUID.randomUUID().toString, + None, + ConnectionRecord.Role.Inviter, + ProtocolState.InvitationGenerated, + Invitation(from = DidId("did:peer:INVITER"), Invitation.Body("", "", Nil)), + None, + None, + 5, + None, + None + ) + + private val inviterExpectations = + MockConnectionService.CreateConnectionInvitation( + assertion = Assertion.anything, + result = Expectation.value(record) + ) ++ MockConnectionService.ReceiveConnectionRequest( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.ConnectionRequestReceived)) + ) ++ MockConnectionService.AcceptConnectionRequest( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.ConnectionResponsePending)) + ) ++ MockConnectionService.MarkConnectionResponseSent( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.ConnectionResponseSent)) + ) + + private val inviteeExpectations = + MockConnectionService.ReceiveConnectionInvitation( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.InvitationReceived)) + ) ++ MockConnectionService.AcceptConnectionInvitation( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.ConnectionRequestPending)) + ) ++ MockConnectionService.MarkConnectionRequestSent( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.ConnectionRequestSent)) + ) ++ MockConnectionService.ReceiveConnectionResponse( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.ConnectionResponseReceived)) + ) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + suite("ConnectionServiceWithEventNotificationImpl")( + test("should send relevant events during flow execution on the inviter side") { + for { + cs <- ZIO.service[ConnectionService] + ens <- ZIO.service[EventNotificationService] + did = DidId("did:peer:INVITER") + connectionRecord <- cs.createConnectionInvitation(Some("test"), did) + _ <- cs.receiveConnectionRequest( + ConnectionRequest( + from = DidId("did:peer:INVITER"), + to = DidId("did:peer:INVITEE"), + thid = Some(connectionRecord.thid), + pthid = None, + body = ConnectionRequest.Body() + ) + ) + _ <- cs.acceptConnectionRequest(connectionRecord.id) + _ <- cs.markConnectionResponseSent(connectionRecord.id) + consumer <- ens.consumer[ConnectionRecord]("Connect") + events <- consumer.poll(50) + } yield { + assertTrue(events.size == 4) && + assertTrue(events.head.data.protocolState == ProtocolState.InvitationGenerated) && + assertTrue(events(1).data.protocolState == ProtocolState.ConnectionRequestReceived) && + assertTrue(events(2).data.protocolState == ProtocolState.ConnectionResponsePending) && + assertTrue(events(3).data.protocolState == ProtocolState.ConnectionResponseSent) + } + }.provide( + ZLayer.succeed(50) >>> EventNotificationServiceImpl.layer, + inviterExpectations.toLayer >>> ConnectionServiceNotifier.layer + ), + test("should send relevant events during flow execution on the invitee side") { + for { + inviterSvc <- ZIO + .service[ConnectionService] + .provideLayer(ConnectionRepositoryInMemory.layer >>> ConnectionServiceImpl.layer) + inviterDID = DidId("did:peer:INVITER") + inviterRecord <- inviterSvc.createConnectionInvitation( + Some("Test connection invitation"), + inviterDID + ) + inviteeSvc <- ZIO.service[ConnectionService] + inviteeDID = DidId("did:peer:INVITEE") + ens <- ZIO.service[EventNotificationService] + connectionRecord <- inviteeSvc.receiveConnectionInvitation(inviterRecord.invitation.toBase64) + _ <- inviteeSvc.acceptConnectionInvitation(connectionRecord.id, inviteeDID) + _ <- inviteeSvc.markConnectionRequestSent(connectionRecord.id) + _ <- inviteeSvc.receiveConnectionResponse( + ConnectionResponse( + from = inviterDID, + to = inviteeDID, + thid = Some(connectionRecord.thid), + pthid = None, + body = ConnectionResponse.Body() + ) + ) + consumer <- ens.consumer[ConnectionRecord]("Connect") + events <- consumer.poll(50) + } yield { + assertTrue(events.size == 4) && + assertTrue(events.head.data.protocolState == ProtocolState.InvitationReceived) && + assertTrue(events(1).data.protocolState == ProtocolState.ConnectionRequestPending) && + assertTrue(events(2).data.protocolState == ProtocolState.ConnectionRequestSent) && + assertTrue(events(3).data.protocolState == ProtocolState.ConnectionResponseReceived) + } + }.provide( + ZLayer.succeed(50) >>> EventNotificationServiceImpl.layer, + inviteeExpectations.toLayer >>> ConnectionServiceNotifier.layer + ) + ) + } + +} diff --git a/docs/docusaurus/sidebars.js b/docs/docusaurus/sidebars.js index e84b07b37d..8d9c7e5eef 100644 --- a/docs/docusaurus/sidebars.js +++ b/docs/docusaurus/sidebars.js @@ -65,6 +65,13 @@ const sidebars = { 'secrets/operation', 'secrets/seed-generation' ] + }, + { + type: 'category', + label: 'Webhooks', + items: [ + 'webhooks/webhook', + ] } ] } diff --git a/docs/docusaurus/webhooks/webhook.md b/docs/docusaurus/webhooks/webhook.md new file mode 100644 index 0000000000..123acb9df4 --- /dev/null +++ b/docs/docusaurus/webhooks/webhook.md @@ -0,0 +1,164 @@ +# Webhook Notifications + +## Introduction + +Welcome to the tutorial on webhook notifications in PRISM Agent. In this tutorial, we will explore how webhook +notifications can enhance your experience with PRISM Agent by providing real-time updates on events. By leveraging +webhook notifications, you can stay informed about important changes happening within the agent. + +## Understanding Webhook Notifications + +### What are Webhooks? + +Webhooks enable real-time communication between applications by sending HTTP requests containing event data to specified +endpoints (webhook URLs) when events occur. They establish a direct communication channel, allowing applications to +receive instant updates and respond in a timely manner, promoting efficient integration between event-driven +systems. + +### Purpose of Webhook Notifications in PRISM Agent + +Webhook notifications in PRISM Agent serve as a vital feature, enabling you to receive timely updates on various events +occurring within the agent. Webhooks allow you to receive HTTP requests containing event details at a specified +endpoint (webhook URL). These events are specifically related to the execution of +the [Connect](/tutorials/connections/connection), [Issue](/tutorials/credentials/issue), +and [Presentation](/tutorials/credentials/present-proof) flows. Webhook notifications will be sent each time there is a +state +change during the execution of these protocols. + +By leveraging webhooks, you can integrate PRISM Agent seamlessly into your applications and systems. You can track and +monitor the progress of the main flows, receiving timely updates about changes and events. + +## Configuring the Webhook Feature + +### Enabling the Webhook Feature + +PRISM Agent uses the following environment variables to manage webhook notifications: + +| Name | Description | Default | +|-------------------|--------------------------------------------------------------------------|---------| +| `WEBHOOK_URL` | The webhook endpoint URL where the notifications will be sent | null | +| `WEBHOOK_API_KEY` | The optional API key (bearer token) to use as the `Authorization` header | null | + +### Securing the Webhook Endpoint + +It is essential to secure the webhook endpoint to protect the integrity and confidentiality of the event data. Consider +the following best practices when securing your webhook endpoint: + +- Use HTTPS to encrypt communication between PRISM Agent and the webhook endpoint. +- Implement authentication mechanisms (e.g., API keys, tokens) to verify the authenticity of incoming requests. +- Validate and sanitize incoming webhook requests to mitigate potential security risks. + +The current supported authorization mechanism for PRISM Agent's webhook notifications is the bearer token. If +configured, the token will be included in the `Authorization` header of the HTTP request sent by the agent to the +webhook endpoint. You can configure this bearer token by setting the value of the `WEBHOOK_API_KEY` environment +variable. + +## Event Format and Types + +### Event Format + +Webhook notifications from PRISM Agent are sent as JSON payloads in the HTTP requests. + +The event format is consistent across all events. Each event follows a common structure, while the 'data' field +within the event payload contains information specific to the type of event. Here is an example of the JSON payload +format: + +The event payload typically includes relevant details about the specific event that occurred within the agent. Below is +an example of the JSON payload format: + +```json +{ + "id": "cb8d4e96-30f0-4892-863f-44d49d634211", + "ts": "2023-07-06T12:01:19.769427Z", + "eventType": "xxxx", + "data": { + // Event-specific data goes here + } +} +``` + +This event format ensures consistency and allows you to handle webhook notifications uniformly while easily extracting +the relevant data specific to each event type from the `data` field. + +Here is an example of a webhook notification event related to a connection flow state change (invitation generated): + +```json +{ + "id": "cb8d4e96-30f0-4892-863f-44d49d634211", + "ts": "2023-07-06T12:01:19.769427Z", + "eventType": "Connection", + "data": { + "connectionId": "c10787cf-99bb-47f4-99bb-1fdcca32b673", + "label": "Connect with Alice", + "role": "Inviter", + "state": "InvitationGenerated", + "invitation": { + "id": "c10787cf-99bb-47f4-99bb-1fdcca32b673", + "type": "https://didcomm.org/out-of-band/2.0/invitation", + "from": "did:peer:2.Ez6LS...jIiXX0", + "invitationUrl": "https://my.domain.com/path?_oob=eyJpZCI6...bXX19" + }, + "createdAt": "2023-07-06T12:01:19.760126Z", + "self": "c10787cf-99bb-47f4-99bb-1fdcca32b673", + "kind": "Connection" + } +} +``` + +### Common Event Types + +PRISM Agent sends webhook notifications for events related to protocol state changes in +the [Connect](/tutorials/connections/connection), [Issue](/tutorials/credentials/issue), +and [Presentation](/tutorials/credentials/present-proof) flows. These events allow you to track the progress and updates +within these flows in real-time. Some common event types that you can expect to receive through webhook notifications +include: + +- Connection State Change: Notifies about state changes in the connection flow, such as `InvitationGenerated`, + `ConnectionRequestSent`, `ConnectionResponseReceived`, etc. Please refer to the `state` field of + the [connection resource](/agent-api/#tag/Connections-Management/operation/getConnection) + for an exhaustive list of states. +- Credential State Change: Indicates changes in the credential issuance flow, such as `OfferSent`, `RequestReceived`, + `CredentialSent`, etc. Please refer to the `protocolState` field of + the [credential resource](/agent-api/#tag/Issue-Credentials-Protocol/operation/getCredentialRecord) + for an exhaustive list of states. +- Presentation State Change: Notifies about changes in the presentation flow, such as `RequestReceived`, + `PresentationGenerated`, `PresentationVerified`, etc. Please refer to the `status` field of + the [presentation resource](/agent-api/#tag/Present-Proof/operation/getPresentation) for an + exhaustive list of states. +- DID State Change: Notifies about DID-related state changes. Currently, only the `Published` DID publication state + event will be notified. + +## Processing Webhook Notifications + +### Handling Incoming Webhook Requests + +To handle incoming webhook notifications from PRISM Agent in your application, follow these general steps: + +1. Receive the HTTP request at your specified webhook endpoint. +2. Parse the JSON payload of the request to extract the event details. +3. Process the event data according to your application's requirements. +4. Send a response back to acknowledge the successful receipt of the webhook notification. For a successful reception, + the response status code should be `>= 200` and `< 300`. Any other response status code will lead to a new attempt + from the PRISM Agent. + +### Error Handling and Retry Mechanisms + +When working with webhook notifications in PRISM Agent, it is important to consider error handling and retry mechanisms. +In case of failed webhook notifications or errors, PRISM Agent employs an automatic retry mechanism to ensure delivery. +The agent will attempt to send the webhook notification up to three times, with a five-second interval between each +attempt. Please note that the number of retries and the interval duration are currently not configurable in PRISM Agent. + +By default, this retry mechanism provides a reasonable level of reliability for delivering webhook notifications, +allowing for temporary network issues or intermittent failures. + +## Conclusion + +Congratulations! You've learned about webhook notifications in PRISM Agent. By leveraging this feature, you can receive +real-time updates on events happening within the agent, enabling you to integrate PRISM Agent seamlessly into your +applications. Remember to secure your webhook endpoint and handle webhook notifications effectively to maximize the +benefits of this feature. + +Start integrating webhook notifications into your PRISM Agent workflow and unlock the power of real-time event updates! + +If you have any further questions or need assistance, don't hesitate to reach out to the PRISM Agent support team or +refer to the official documentation for more details. \ No newline at end of file diff --git a/event-notification/src/main/scala/io/iohk/atala/event/notification/Event.scala b/event-notification/src/main/scala/io/iohk/atala/event/notification/Event.scala new file mode 100644 index 0000000000..2ed89e8999 --- /dev/null +++ b/event-notification/src/main/scala/io/iohk/atala/event/notification/Event.scala @@ -0,0 +1,10 @@ +package io.iohk.atala.event.notification +import java.time.Instant +import java.util.UUID +import zio.IO + +case class Event[A](`type`: String, id: UUID, ts: Instant, data: A) + +object Event { + def apply[A](`type`: String, data: A): Event[A] = Event(`type`, UUID.randomUUID(), Instant.now(), data) +} diff --git a/event-notification/src/main/scala/io/iohk/atala/event/notification/EventConsumer.scala b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventConsumer.scala new file mode 100644 index 0000000000..3b65858bd5 --- /dev/null +++ b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventConsumer.scala @@ -0,0 +1,6 @@ +package io.iohk.atala.event.notification + +import zio.IO + +trait EventConsumer[A]: + def poll(count: Int): IO[EventNotificationServiceError, Seq[Event[A]]] diff --git a/event-notification/src/main/scala/io/iohk/atala/event/notification/EventNotificationService.scala b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventNotificationService.scala new file mode 100644 index 0000000000..e3f590faf5 --- /dev/null +++ b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventNotificationService.scala @@ -0,0 +1,7 @@ +package io.iohk.atala.event.notification + +import zio.IO + +trait EventNotificationService: + def consumer[A](topic: String): IO[EventNotificationServiceError, EventConsumer[A]] + def producer[A](topic: String): IO[EventNotificationServiceError, EventProducer[A]] diff --git a/event-notification/src/main/scala/io/iohk/atala/event/notification/EventNotificationServiceError.scala b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventNotificationServiceError.scala new file mode 100644 index 0000000000..abee167f41 --- /dev/null +++ b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventNotificationServiceError.scala @@ -0,0 +1,7 @@ +package io.iohk.atala.event.notification + +sealed trait EventNotificationServiceError + +object EventNotificationServiceError { + case class EventSendingFailed(msg: String) extends EventNotificationServiceError +} diff --git a/event-notification/src/main/scala/io/iohk/atala/event/notification/EventNotificationServiceImpl.scala b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventNotificationServiceImpl.scala new file mode 100644 index 0000000000..55a138c8df --- /dev/null +++ b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventNotificationServiceImpl.scala @@ -0,0 +1,50 @@ +package io.iohk.atala.event.notification + +import io.iohk.atala.event.notification.EventNotificationServiceError.EventSendingFailed +import zio.concurrent.ConcurrentMap +import zio.{IO, Queue, URLayer, ZIO, ZLayer} + +class EventNotificationServiceImpl(queueMap: ConcurrentMap[String, Queue[Event[_]]], queueCapacity: Int) + extends EventNotificationService: + + private[this] def getOrCreateQueue(topic: String): IO[EventNotificationServiceError, Queue[Event[_]]] = { + for { + maybeQueue <- queueMap.get(topic) + queue <- maybeQueue match + case Some(value) => ZIO.succeed(value) + case None => Queue.sliding(queueCapacity) + _ <- queueMap.put(topic, queue) + } yield queue + } + + override def consumer[A]( + topic: String + ): IO[EventNotificationServiceError, EventConsumer[A]] = + ZIO.succeed(new EventConsumer[A] { + override def poll(count: Int): IO[EventNotificationServiceError, Seq[Event[A]]] = for { + queue <- getOrCreateQueue(topic) + events <- queue.takeBetween(1, count) + decodedEvents <- ZIO.foreach(events)(e => ZIO.succeed(e.asInstanceOf[Event[A]])) + } yield decodedEvents + }) + + override def producer[A]( + topic: String + ): IO[EventNotificationServiceError, EventProducer[A]] = + ZIO.succeed(new EventProducer[A] { + override def send(event: Event[A]): IO[EventNotificationServiceError, Unit] = for { + queue <- getOrCreateQueue(topic) + succeeded <- queue.offer(event) + _ <- if (succeeded) ZIO.unit else ZIO.fail(EventSendingFailed("Queue.offer returned 'false'")) + } yield () + }) + +object EventNotificationServiceImpl { + val layer: URLayer[Int, EventNotificationServiceImpl] = + ZLayer.fromZIO( + for { + map <- ConcurrentMap.make[String, Queue[Event[_]]]() + capacity <- ZIO.service[Int] + } yield new EventNotificationServiceImpl(map, capacity) + ) +} diff --git a/event-notification/src/main/scala/io/iohk/atala/event/notification/EventProducer.scala b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventProducer.scala new file mode 100644 index 0000000000..9fec9c2f8d --- /dev/null +++ b/event-notification/src/main/scala/io/iohk/atala/event/notification/EventProducer.scala @@ -0,0 +1,6 @@ +package io.iohk.atala.event.notification + +import zio.IO + +trait EventProducer[A]: + def send(event: Event[A]): IO[EventNotificationServiceError, Unit] diff --git a/event-notification/src/test/scala/io/iohk/atala/event/notification/EventNotificationServiceImplSpec.scala b/event-notification/src/test/scala/io/iohk/atala/event/notification/EventNotificationServiceImplSpec.scala new file mode 100644 index 0000000000..9e5dbf63f5 --- /dev/null +++ b/event-notification/src/test/scala/io/iohk/atala/event/notification/EventNotificationServiceImplSpec.scala @@ -0,0 +1,102 @@ +package io.iohk.atala.event.notification + +import zio.* +import zio.test.* + +object EventNotificationServiceImplSpec extends ZIOSpecDefault { + + private val eventNotificationServiceLayer = ZLayer.succeed(10) >>> EventNotificationServiceImpl.layer + + override def spec: Spec[TestEnvironment with Scope, Any] = { + suite("EventNotificationServiceImpl")( + test("should send events between a producer and a consumer of the same topic") { + for { + svc <- ZIO.service[EventNotificationService] + producer <- svc.producer[String]("TopicA") + consumer <- svc.consumer[String]("TopicA") + _ <- producer.send(Event("Foo", "event #1")) + _ <- producer.send(Event("Foo", "event #2")) + events <- consumer.poll(2) + } yield assertTrue(events.map(_.data) == Seq("event #1", "event #2")) + }, + test("should not mix-up events from different topics") { + for { + svc <- ZIO.service[EventNotificationService] + producerA <- svc.producer[String]("TopicA") + consumerA <- svc.consumer[String]("TopicA") + producerB <- svc.producer[String]("TopicB") + consumerB <- svc.consumer[String]("TopicB") + _ <- producerA.send(Event("Foo", "event #1")) + _ <- producerA.send(Event("Foo", "event #2")) + _ <- producerB.send(Event("Foo", "event #3")) + eventsA <- consumerA.poll(5) + eventsB <- consumerB.poll(5) + } yield { + assertTrue(eventsA.size == 2) && + assertTrue(eventsB.size == 1) && + assertTrue(eventsA.map(_.data) == Seq("event #1", "event #2")) && + assertTrue(eventsB.map(_.data) == Seq("event #3")) + } + }, + test("should only deliver the requested messages number to a consumer") { + for { + svc <- ZIO.service[EventNotificationService] + producer <- svc.producer[String]("TopicA") + consumer <- svc.consumer[String]("TopicA") + _ <- producer.send(Event("Foo", "event #1")) + _ <- producer.send(Event("Foo", "event #2")) + events <- consumer.poll(1) + } yield assertTrue(events.map(_.data) == Seq("event #1")) + }, + test("should remove consumed messages from the queue") { + for { + svc <- ZIO.service[EventNotificationService] + producer <- svc.producer[String]("TopicA") + consumer <- svc.consumer[String]("TopicA") + _ <- producer.send(Event("Foo", "event #1")) + _ <- producer.send(Event("Foo", "event #2")) + _ <- consumer.poll(1) + events <- consumer.poll(1) + } yield assertTrue(events.map(_.data) == Seq("event #2")) + }, + test("should send event even when consumer is created and polling first") { + for { + svc <- ZIO.service[EventNotificationService] + // Consuming in a fiber + consumer <- svc.consumer[String]("TopicA") + consumerFiber <- consumer.poll(1).fork + // Producing in another fiber, after 3 seconds + producer <- svc.producer[String]("TopicA") + producerFiber <- producer.send(Event("Foo", "event #1")).delay(3.seconds).fork + _ <- TestClock.adjust(3.seconds) + events <- consumerFiber.join + _ <- producerFiber.join + } yield assertTrue(events.map(_.data) == Seq("event #1")) + }, + test("should drop old items when sending new messages and queue is full") { + for { + svc <- ZIO.service[EventNotificationService] + producer <- svc.producer[String]("TopicA") + consumer <- svc.consumer[String]("TopicA") + _ <- ZIO.collectAll((1 to 10).map(i => producer.send(Event("Foo", s"event #$i")))) + _ <- producer.send(Event("Foo", "One more event")) + events <- consumer.poll(10) + } yield { + assertTrue(events.size == 10) && + assertTrue(events.head.data == "event #2") && + assertTrue(events(9).data == "One more event") + } + }, + test("should block on reading new messages when queue is empty") { + for { + svc <- ZIO.service[EventNotificationService] + consumer <- svc.consumer[String]("TopicA") + fiber <- consumer.poll(1).timeout(5.seconds).fork + _ <- TestClock.adjust(5.seconds) + res <- fiber.join + } yield assertTrue(res.isEmpty) + } + ) + }.provideLayer(eventNotificationServiceLayer) + +} diff --git a/infrastructure/shared/docker-compose.yml b/infrastructure/shared/docker-compose.yml index d3e3abcf02..f365a0271b 100644 --- a/infrastructure/shared/docker-compose.yml +++ b/infrastructure/shared/docker-compose.yml @@ -97,7 +97,11 @@ services: PRISM_NODE_PORT: 50053 VAULT_ADDR: ${VAULT_ADDR:-http://vault-server:8200} VAULT_TOKEN: ${VAULT_DEV_ROOT_TOKEN_ID:-root} - DEV_MODE: "true" + SECRET_STORAGE_BACKEND: postgres + DEV_MODE: true + WALLET_SEED: + WEBHOOK_URL: + WEBHOOK_API_KEY: depends_on: db: condition: service_healthy diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifier.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifier.scala new file mode 100644 index 0000000000..f613474ffa --- /dev/null +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifier.scala @@ -0,0 +1,172 @@ +package io.iohk.atala.pollux.core.service + +import io.circe.Json +import io.iohk.atala.castor.core.model.did.CanonicalPrismDID +import io.iohk.atala.event.notification.* +import io.iohk.atala.mercury.model.DidId +import io.iohk.atala.mercury.protocol.issuecredential.{IssueCredential, OfferCredential, RequestCredential} +import io.iohk.atala.pollux.core.model.error.CredentialServiceError +import io.iohk.atala.pollux.core.model.{DidCommID, IssueCredentialRecord, PublishedBatchData} +import io.iohk.atala.pollux.vc.jwt.{Issuer, JWT, PresentationPayload, W3cCredentialPayload} +import io.iohk.atala.prism.crypto.MerkleInclusionProof +import zio.{IO, URLayer, ZIO, ZLayer} + +import java.time.Instant + +class CredentialServiceNotifier( + svc: CredentialService, + eventNotificationService: EventNotificationService +) extends CredentialService { + + private val issueCredentialRecordUpdatedEvent = "IssueCredentialRecordUpdated" + + override def createIssueCredentialRecord( + pairwiseIssuerDID: DidId, + pairwiseHolderDID: DidId, + thid: DidCommID, + maybeSchemaId: Option[_root_.java.lang.String], + claims: Json, + validityPeriod: Option[Double], + automaticIssuance: Option[Boolean], + awaitConfirmation: Option[Boolean], + issuingDID: Option[CanonicalPrismDID] + ): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess( + svc.createIssueCredentialRecord( + pairwiseIssuerDID, + pairwiseHolderDID, + thid, + maybeSchemaId, + claims, + validityPeriod, + automaticIssuance, + awaitConfirmation, + issuingDID + ) + ) + + override def markOfferSent(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.markOfferSent(recordId)) + + override def receiveCredentialOffer(offer: OfferCredential): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.receiveCredentialOffer(offer)) + + override def acceptCredentialOffer( + recordId: DidCommID, + subjectId: String + ): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.acceptCredentialOffer(recordId, subjectId)) + + override def generateCredentialRequest( + recordId: DidCommID, + signedPresentation: JWT + ): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.generateCredentialRequest(recordId, signedPresentation)) + + override def markRequestSent(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.markRequestSent(recordId)) + + override def receiveCredentialRequest(request: RequestCredential): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.receiveCredentialRequest(request)) + + override def acceptCredentialRequest(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.acceptCredentialRequest(recordId)) + + override def markCredentialGenerated( + recordId: DidCommID, + issueCredential: IssueCredential + ): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.markCredentialGenerated(recordId, issueCredential)) + + override def markCredentialSent(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.markCredentialSent(recordId)) + + override def receiveCredentialIssue(issue: IssueCredential): IO[CredentialServiceError, IssueCredentialRecord] = + notifyOnSuccess(svc.receiveCredentialIssue(issue)) + + private[this] def notifyOnSuccess(effect: IO[CredentialServiceError, IssueCredentialRecord]) = + for { + record <- effect + _ <- notify(record) + } yield record + + private[this] def notify(record: IssueCredentialRecord) = { + val result = for { + producer <- eventNotificationService.producer[IssueCredentialRecord]("Issue") + _ <- producer.send(Event(issueCredentialRecordUpdatedEvent, record)) + } yield () + result.catchAll(e => ZIO.logError(s"Notification service error: $e")) + } + + override def createPresentationPayload( + recordId: DidCommID, + subject: Issuer + ): IO[CredentialServiceError, PresentationPayload] = + svc.createPresentationPayload(recordId, subject) + + override def createCredentialPayloadFromRecord( + record: IssueCredentialRecord, + issuer: Issuer, + issuanceDate: Instant + ): IO[CredentialServiceError, W3cCredentialPayload] = + svc.createCredentialPayloadFromRecord(record, issuer, issuanceDate) + + override def publishCredentialBatch( + credentials: Seq[W3cCredentialPayload], + issuer: Issuer + ): IO[CredentialServiceError, PublishedBatchData] = + svc.publishCredentialBatch(credentials, issuer) + + override def markCredentialRecordsAsPublishQueued( + credentialsAndProofs: Seq[(W3cCredentialPayload, MerkleInclusionProof)] + ): IO[CredentialServiceError, Int] = + svc.markCredentialRecordsAsPublishQueued(credentialsAndProofs) + + override def markCredentialPublicationPending( + recordId: DidCommID + ): IO[CredentialServiceError, IssueCredentialRecord] = + svc.markCredentialPublicationPending(recordId) + + override def markCredentialPublicationQueued(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + svc.markCredentialPublicationQueued(recordId) + + override def markCredentialPublished(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + svc.markCredentialPublished(recordId) + + override def reportProcessingFailure( + recordId: DidCommID, + failReason: Option[_root_.java.lang.String] + ): IO[CredentialServiceError, Unit] = + svc.reportProcessingFailure(recordId, failReason) + + override def extractIdFromCredential(credential: W3cCredentialPayload): Option[DidCommID] = + svc.extractIdFromCredential(credential) + + override def getIssueCredentialRecord( + recordId: DidCommID + ): IO[CredentialServiceError, Option[IssueCredentialRecord]] = + svc.getIssueCredentialRecord(recordId) + + override def getIssueCredentialRecordByThreadId( + thid: DidCommID + ): IO[CredentialServiceError, Option[IssueCredentialRecord]] = + svc.getIssueCredentialRecordByThreadId(thid) + + override def getIssueCredentialRecords( + offset: Option[Int] = None, + limit: Option[Int] = None + ): IO[CredentialServiceError, (Seq[IssueCredentialRecord], Int)] = + svc.getIssueCredentialRecords(offset, limit) + + override def getIssueCredentialRecordsByStates( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: IssueCredentialRecord.ProtocolState* + ): IO[CredentialServiceError, Seq[IssueCredentialRecord]] = + svc.getIssueCredentialRecordsByStates(ignoreWithZeroRetries, limit, states: _*) +} + +object CredentialServiceNotifier { + val layer: URLayer[CredentialService & EventNotificationService, CredentialServiceNotifier] = + ZLayer.fromFunction(CredentialServiceNotifier(_, _)) +} diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/HttpURIDereferencerImpl.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/HttpURIDereferencerImpl.scala index 76fb4e8cd2..7f060f654d 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/HttpURIDereferencerImpl.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/HttpURIDereferencerImpl.scala @@ -3,14 +3,14 @@ package io.iohk.atala.pollux.core.service import io.iohk.atala.pollux.core.service.URIDereferencerError.{ConnectionError, ResourceNotFound, UnexpectedError} import zio.http.* import zio.http.model.* -import zio.* +import zio.{IO, ULayer, URLayer, ZIO, ZLayer} import java.net.URI -class HttpURIDereferencerImpl extends URIDereferencer { +class HttpURIDereferencerImpl(client: Client) extends URIDereferencer { override def dereference(uri: URI): IO[URIDereferencerError, String] = { - val result = for { + val result: ZIO[Client, URIDereferencerError, String] = for { response <- Client.request(uri.toString).mapError(t => ConnectionError(t.getMessage)) body <- response match case Response(Status.Ok, _, body, _, None) => @@ -20,16 +20,11 @@ class HttpURIDereferencerImpl extends URIDereferencer { case Response(_, _, _, _, httpError) => ZIO.fail(UnexpectedError(s"HTTP response error: $httpError")) } yield body - result - .provide(Scope.default >>> Client.default) - .mapError { - case e: URIDereferencerError => e - case t => UnexpectedError(t.toString) - } + result.provide(ZLayer.succeed(client)) } } object HttpURIDereferencerImpl { - val layer: ULayer[URIDereferencer] = ZLayer.succeed(HttpURIDereferencerImpl()) + val layer: URLayer[Client, URIDereferencer] = ZLayer.fromFunction(HttpURIDereferencerImpl(_)) } diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockCredentialService.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockCredentialService.scala new file mode 100644 index 0000000000..26d9257cb3 --- /dev/null +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockCredentialService.scala @@ -0,0 +1,197 @@ +package io.iohk.atala.pollux.core.service + +import io.circe.Json +import io.iohk.atala.castor.core.model.did.CanonicalPrismDID +import io.iohk.atala.mercury.model.DidId +import io.iohk.atala.mercury.protocol.issuecredential.{IssueCredential, OfferCredential, RequestCredential} +import io.iohk.atala.pollux.core.model.error.CredentialServiceError +import io.iohk.atala.pollux.core.model.{DidCommID, IssueCredentialRecord, PublishedBatchData} +import io.iohk.atala.pollux.vc.jwt.{Issuer, JWT, PresentationPayload, W3cCredentialPayload} +import io.iohk.atala.prism.crypto.MerkleInclusionProof +import zio.mock.{Mock, Proxy} +import zio.{IO, URLayer, ZIO, ZLayer, mock} + +import java.time.Instant + +object MockCredentialService extends Mock[CredentialService] { + + object CreateIssueCredentialRecord + extends Effect[ + ( + DidId, + DidId, + DidCommID, + Option[String], + Json, + Option[Double], + Option[Boolean], + Option[Boolean], + Option[CanonicalPrismDID] + ), + CredentialServiceError, + IssueCredentialRecord + ] + + object ReceiveCredentialOffer extends Effect[OfferCredential, CredentialServiceError, IssueCredentialRecord] + object AcceptCredentialOffer extends Effect[(DidCommID, String), CredentialServiceError, IssueCredentialRecord] + object CreatePresentationPayload extends Effect[(DidCommID, Issuer), CredentialServiceError, PresentationPayload] + object GenerateCredentialRequest extends Effect[(DidCommID, JWT), CredentialServiceError, IssueCredentialRecord] + object ReceiveCredentialRequest extends Effect[RequestCredential, CredentialServiceError, IssueCredentialRecord] + object AcceptCredentialRequest extends Effect[DidCommID, CredentialServiceError, IssueCredentialRecord] + object CreateCredentialPayloadFromRecord + extends Effect[(IssueCredentialRecord, Issuer, Instant), CredentialServiceError, W3cCredentialPayload] + object PublishCredentialBatch + extends Effect[(Seq[W3cCredentialPayload], Issuer), CredentialServiceError, PublishedBatchData] + object MarkCredentialRecordsAsPublishQueued + extends Effect[Seq[(W3cCredentialPayload, MerkleInclusionProof)], CredentialServiceError, Int] + object ReceiveCredentialIssue extends Effect[IssueCredential, CredentialServiceError, IssueCredentialRecord] + object MarkOfferSent extends Effect[DidCommID, CredentialServiceError, IssueCredentialRecord] + object MarkRequestSent extends Effect[DidCommID, CredentialServiceError, IssueCredentialRecord] + object MarkCredentialGenerated + extends Effect[(DidCommID, IssueCredential), CredentialServiceError, IssueCredentialRecord] + object MarkCredentialSent extends Effect[DidCommID, CredentialServiceError, IssueCredentialRecord] + object MarkCredentialPublicationPending extends Effect[DidCommID, CredentialServiceError, IssueCredentialRecord] + object MarkCredentialPublicationQueued extends Effect[DidCommID, CredentialServiceError, IssueCredentialRecord] + object MarkCredentialPublished extends Effect[DidCommID, CredentialServiceError, IssueCredentialRecord] + object ReportProcessingFailure extends Effect[(DidCommID, Option[String]), CredentialServiceError, Unit] + + override val compose: URLayer[mock.Proxy, CredentialService] = ZLayer { + for { + proxy <- ZIO.service[Proxy] + } yield new CredentialService { + + override def createIssueCredentialRecord( + pairwiseIssuerDID: DidId, + pairwiseHolderDID: DidId, + thid: DidCommID, + maybeSchemaId: Option[String], + claims: Json, + validityPeriod: Option[Double], + automaticIssuance: Option[Boolean], + awaitConfirmation: Option[Boolean], + issuingDID: Option[CanonicalPrismDID] + ): IO[CredentialServiceError, IssueCredentialRecord] = + proxy( + CreateIssueCredentialRecord, + pairwiseIssuerDID, + pairwiseHolderDID, + thid, + maybeSchemaId, + claims, + validityPeriod, + automaticIssuance, + awaitConfirmation, + issuingDID + ) + + override def receiveCredentialOffer(offer: OfferCredential): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(ReceiveCredentialOffer, offer) + + override def acceptCredentialOffer( + recordId: DidCommID, + subjectId: String + ): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(AcceptCredentialOffer, recordId, subjectId) + + override def createPresentationPayload( + recordId: DidCommID, + subject: Issuer + ): IO[CredentialServiceError, PresentationPayload] = + proxy(CreatePresentationPayload, recordId, subject) + + override def generateCredentialRequest( + recordId: DidCommID, + signedPresentation: JWT + ): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(GenerateCredentialRequest, recordId, signedPresentation) + + override def receiveCredentialRequest( + request: RequestCredential + ): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(ReceiveCredentialRequest, request) + + override def acceptCredentialRequest(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(AcceptCredentialRequest, recordId) + + override def createCredentialPayloadFromRecord( + record: IssueCredentialRecord, + issuer: Issuer, + issuanceDate: Instant + ): IO[CredentialServiceError, W3cCredentialPayload] = + proxy(CreateCredentialPayloadFromRecord, record, issuer, issuanceDate) + + override def publishCredentialBatch( + credentials: Seq[W3cCredentialPayload], + issuer: Issuer + ): IO[CredentialServiceError, PublishedBatchData] = + proxy(PublishCredentialBatch, credentials, issuer) + + override def markCredentialRecordsAsPublishQueued( + credentialsAndProofs: Seq[(W3cCredentialPayload, MerkleInclusionProof)] + ): IO[CredentialServiceError, Int] = + proxy(MarkCredentialRecordsAsPublishQueued, credentialsAndProofs) + + override def receiveCredentialIssue(issue: IssueCredential): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(ReceiveCredentialIssue, issue) + + override def markOfferSent(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(MarkOfferSent, recordId) + + override def markRequestSent(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(MarkRequestSent, recordId) + + override def markCredentialGenerated( + recordId: DidCommID, + issueCredential: IssueCredential + ): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(MarkCredentialGenerated, recordId, issueCredential) + + override def markCredentialSent(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(MarkCredentialSent, recordId) + + override def markCredentialPublicationPending( + recordId: DidCommID + ): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(MarkCredentialPublicationPending, recordId) + + override def markCredentialPublicationQueued( + recordId: DidCommID + ): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(MarkCredentialPublicationQueued, recordId) + + override def markCredentialPublished(recordId: DidCommID): IO[CredentialServiceError, IssueCredentialRecord] = + proxy(MarkCredentialPublished, recordId) + + override def reportProcessingFailure( + recordId: DidCommID, + failReason: Option[String] + ): IO[CredentialServiceError, Unit] = + proxy(ReportProcessingFailure, recordId, failReason) + + override def extractIdFromCredential(credential: W3cCredentialPayload): Option[DidCommID] = + ??? + + override def getIssueCredentialRecords( + offset: Option[Int] = None, + limit: Option[Int] = None + ): IO[CredentialServiceError, (Seq[IssueCredentialRecord], Int)] = + ??? + + override def getIssueCredentialRecordsByStates( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: IssueCredentialRecord.ProtocolState* + ): IO[CredentialServiceError, Seq[IssueCredentialRecord]] = + ??? + + override def getIssueCredentialRecord( + recordId: DidCommID + ): IO[CredentialServiceError, Option[IssueCredentialRecord]] = + ??? + + override def getIssueCredentialRecordByThreadId( + thid: DidCommID + ): IO[CredentialServiceError, Option[IssueCredentialRecord]] = ??? + } + } +} diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockPresentationService.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockPresentationService.scala new file mode 100644 index 0000000000..9319bfcb2d --- /dev/null +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockPresentationService.scala @@ -0,0 +1,154 @@ +package io.iohk.atala.pollux.core.service + +import io.iohk.atala.mercury.model.DidId +import io.iohk.atala.mercury.protocol.presentproof.{Presentation, ProofType, ProposePresentation, RequestPresentation} +import io.iohk.atala.pollux.core.model.error.PresentationError +import io.iohk.atala.pollux.core.model.presentation.Options +import io.iohk.atala.pollux.core.model.{DidCommID, PresentationRecord} +import io.iohk.atala.pollux.vc.jwt.{Issuer, PresentationPayload, W3cCredentialPayload} +import zio.mock.{Mock, Proxy} +import zio.{IO, URLayer, ZIO, ZLayer, mock} + +import java.time.Instant +import java.util.UUID + +object MockPresentationService extends Mock[PresentationService] { + + object CreatePresentationRecord + extends Effect[ + (DidId, DidId, DidCommID, Option[String], Seq[ProofType], Option[Options]), + PresentationError, + PresentationRecord + ] + + object MarkRequestPresentationSent extends Effect[DidCommID, PresentationError, PresentationRecord] + + object ReceivePresentation extends Effect[Presentation, PresentationError, PresentationRecord] + + object MarkPresentationVerified extends Effect[DidCommID, PresentationError, PresentationRecord] + + object MarkPresentationAccepted extends Effect[DidCommID, PresentationError, PresentationRecord] + + object MarkPresentationRejected extends Effect[DidCommID, PresentationError, PresentationRecord] + + object MarkPresentationVerificationFailed extends Effect[DidCommID, PresentationError, PresentationRecord] + + object AcceptRequestPresentation extends Effect[(DidCommID, Seq[String]), PresentationError, PresentationRecord] + + object RejectRequestPresentation extends Effect[DidCommID, PresentationError, PresentationRecord] + + object MarkPresentationGenerated extends Effect[(DidCommID, Presentation), PresentationError, PresentationRecord] + + object MarkPresentationSent extends Effect[DidCommID, PresentationError, PresentationRecord] + + object AcceptPresentation extends Effect[DidCommID, PresentationError, PresentationRecord] + + object RejectPresentation extends Effect[DidCommID, PresentationError, PresentationRecord] + + object ReceiveRequestPresentation + extends Effect[(Option[String], RequestPresentation), PresentationError, PresentationRecord] + + override val compose: URLayer[mock.Proxy, PresentationService] = ZLayer { + for { + proxy <- ZIO.service[Proxy] + } yield new PresentationService { + + override def createPresentationRecord( + pairwiseVerifierDID: DidId, + pairwiseProverDID: DidId, + thid: DidCommID, + connectionId: Option[String], + proofTypes: Seq[ProofType], + options: Option[Options] + ): IO[PresentationError, PresentationRecord] = + proxy( + CreatePresentationRecord, + (pairwiseVerifierDID, pairwiseProverDID, thid, connectionId, proofTypes, options) + ) + + override def acceptRequestPresentation( + recordId: DidCommID, + credentialsToUse: Seq[String] + ): IO[PresentationError, PresentationRecord] = + proxy(AcceptRequestPresentation, (recordId, credentialsToUse)) + + override def rejectRequestPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + proxy(RejectRequestPresentation, recordId) + + override def receivePresentation(presentation: Presentation): IO[PresentationError, PresentationRecord] = + proxy(ReceivePresentation, presentation) + + override def markRequestPresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + proxy(MarkRequestPresentationSent, recordId) + + override def markPresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + proxy(MarkPresentationSent, recordId) + + override def markPresentationGenerated( + recordId: DidCommID, + presentation: Presentation + ): IO[PresentationError, PresentationRecord] = + proxy(MarkPresentationGenerated, (recordId, presentation)) + + override def markPresentationVerified(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + proxy(MarkPresentationVerified, recordId) + + override def markPresentationRejected(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + proxy(MarkPresentationRejected, recordId) + + override def markPresentationAccepted(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + proxy(MarkPresentationAccepted, recordId) + + override def markPresentationVerificationFailed(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + proxy(MarkPresentationVerificationFailed, recordId) + + override def receiveRequestPresentation( + connectionId: Option[String], + request: RequestPresentation + ): IO[PresentationError, PresentationRecord] = + proxy(ReceiveRequestPresentation, (connectionId, request)) + + override def acceptPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + proxy(AcceptPresentation, recordId) + + override def rejectPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + proxy(RejectPresentation, recordId) + + override def extractIdFromCredential(credential: W3cCredentialPayload): Option[UUID] = ??? + + override def getPresentationRecords(): IO[PresentationError, Seq[PresentationRecord]] = ??? + + override def createPresentationPayloadFromRecord( + record: DidCommID, + issuer: Issuer, + issuanceDate: Instant + ): IO[PresentationError, PresentationPayload] = ??? + + override def getPresentationRecordsByStates( + ignoreWithZeroRetries: Boolean, + limit: Int, + state: PresentationRecord.ProtocolState* + ): IO[PresentationError, Seq[PresentationRecord]] = ??? + + override def getPresentationRecord(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = ??? + + override def getPresentationRecordByThreadId(thid: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + ??? + + override def receiveProposePresentation(request: ProposePresentation): IO[PresentationError, PresentationRecord] = + ??? + + override def acceptProposePresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = ??? + + override def markRequestPresentationRejected(recordId: DidCommID): IO[PresentationError, PresentationRecord] = ??? + + override def markProposePresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] = ??? + + override def reportProcessingFailure( + recordId: DidCommID, + failReason: Option[String] + ): IO[PresentationError, Unit] = ??? + } + } + +} diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationService.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationService.scala index 7d54a26c58..8122a59fd5 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationService.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationService.scala @@ -50,41 +50,41 @@ trait PresentationService { def acceptRequestPresentation( recordId: DidCommID, - crecentialsToUse: Seq[String] - ): IO[PresentationError, Option[PresentationRecord]] + credentialsToUse: Seq[String] + ): IO[PresentationError, PresentationRecord] - def rejectRequestPresentation(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def rejectRequestPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def receiveProposePresentation(request: ProposePresentation): IO[PresentationError, Option[PresentationRecord]] + def receiveProposePresentation(request: ProposePresentation): IO[PresentationError, PresentationRecord] - def acceptProposePresentation(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def acceptProposePresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def receivePresentation(presentation: Presentation): IO[PresentationError, Option[PresentationRecord]] + def receivePresentation(presentation: Presentation): IO[PresentationError, PresentationRecord] - def acceptPresentation(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def acceptPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def rejectPresentation(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def rejectPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def markRequestPresentationSent(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def markRequestPresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def markRequestPresentationRejected(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def markRequestPresentationRejected(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def markProposePresentationSent(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def markProposePresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def markPresentationSent(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def markPresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] def markPresentationGenerated( recordId: DidCommID, presentation: Presentation - ): IO[PresentationError, Option[PresentationRecord]] + ): IO[PresentationError, PresentationRecord] - def markPresentationVerified(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def markPresentationVerified(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def markPresentationRejected(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def markPresentationRejected(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def markPresentationAccepted(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def markPresentationAccepted(recordId: DidCommID): IO[PresentationError, PresentationRecord] - def markPresentationVerificationFailed(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] + def markPresentationVerificationFailed(recordId: DidCommID): IO[PresentationError, PresentationRecord] def reportProcessingFailure(recordId: DidCommID, failReason: Option[String]): IO[PresentationError, Unit] diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceImpl.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceImpl.scala index 1cb2cb6a0c..e35a0a2ed6 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceImpl.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceImpl.scala @@ -31,7 +31,7 @@ private class PresentationServiceImpl( override def markPresentationGenerated( recordId: DidCommID, presentation: Presentation - ): IO[PresentationError, Option[PresentationRecord]] = { + ): IO[PresentationError, PresentationRecord] = { for { record <- getRecordWithState(recordId, ProtocolState.PresentationPending) count <- presentationRepository @@ -43,7 +43,10 @@ private class PresentationServiceImpl( record <- presentationRepository .getPresentationRecord(recordId) .mapError(RepositoryError.apply) - + .flatMap { + case None => ZIO.fail(RecordIdNotFound(record.id)) + case Some(value) => ZIO.succeed(value) + } } yield record } @@ -60,7 +63,6 @@ private class PresentationServiceImpl( record <- ZIO .fromOption(maybeRecord) .mapError(_ => RecordIdNotFound(recordId)) - _ <- ZIO.log(record.toString()) credentialsToUse <- ZIO .fromOption(record.credentialsToUse) .mapError(_ => InvalidFlowStateError(s"No request found for this record: $recordId")) @@ -113,10 +115,10 @@ private class PresentationServiceImpl( .mapError(RepositoryError.apply) } yield record - override def rejectRequestPresentation(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = { + override def rejectRequestPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = { markRequestPresentationRejected(recordId) } - def rejectPresentation(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = { + def rejectPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = { markPresentationRejected(recordId) } @@ -285,7 +287,7 @@ private class PresentationServiceImpl( def acceptRequestPresentation( recordId: DidCommID, credentialsToUse: Seq[String] - ): IO[PresentationError, Option[PresentationRecord]] = { + ): IO[PresentationError, PresentationRecord] = { for { record <- getRecordWithState(recordId, ProtocolState.RequestReceived) @@ -319,10 +321,14 @@ private class PresentationServiceImpl( record <- presentationRepository .getPresentationRecord(recordId) .mapError(RepositoryError.apply) + .flatMap { + case None => ZIO.fail(RecordIdNotFound(record.id)) + case Some(value) => ZIO.succeed(value) + } } yield record } - override def acceptPresentation(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = { + override def acceptPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = { for { maybeRecord <- presentationRepository .getPresentationRecord(recordId) @@ -330,8 +336,7 @@ private class PresentationServiceImpl( record <- ZIO .fromOption(maybeRecord) .mapError(_ => RecordIdNotFound(recordId)) - _ <- ZIO.log(record.toString()) - presentationRequest <- ZIO + _ <- ZIO .fromOption(record.presentationData) .mapError(_ => InvalidFlowStateError(s"No request found for this record: $recordId")) recordUpdated <- markPresentationAccepted(record.id) @@ -339,7 +344,7 @@ private class PresentationServiceImpl( } override def receivePresentation( presentation: Presentation - ): IO[PresentationError, Option[PresentationRecord]] = { + ): IO[PresentationError, PresentationRecord] = { for { record <- getRecordFromThreadId(presentation.thid) _ <- presentationRepository @@ -352,10 +357,14 @@ private class PresentationServiceImpl( record <- presentationRepository .getPresentationRecord(record.id) .mapError(RepositoryError.apply) + .flatMap { + case None => ZIO.fail(RecordIdNotFound(record.id)) + case Some(value) => ZIO.succeed(value) + } } yield record } - override def acceptProposePresentation(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = { + override def acceptProposePresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = { for { maybeRecord <- presentationRepository .getPresentationRecord(recordId) @@ -377,12 +386,16 @@ private class PresentationServiceImpl( record <- presentationRepository .getPresentationRecord(record.id) .mapError(RepositoryError.apply) + .flatMap { + case None => ZIO.fail(RecordIdNotFound(record.id)) + case Some(value) => ZIO.succeed(value) + } } yield record } override def receiveProposePresentation( proposePresentation: ProposePresentation - ): IO[PresentationError, Option[PresentationRecord]] = { + ): IO[PresentationError, PresentationRecord] = { for { record <- getRecordFromThreadId(proposePresentation.thid) _ <- presentationRepository @@ -395,6 +408,10 @@ private class PresentationServiceImpl( record <- presentationRepository .getPresentationRecord(record.id) .mapError(RepositoryError.apply) + .flatMap { + case None => ZIO.fail(RecordIdNotFound(record.id)) + case Some(value) => ZIO.succeed(value) + } } yield record } @@ -416,48 +433,48 @@ private class PresentationServiceImpl( } yield record } - override def markRequestPresentationSent(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + override def markRequestPresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] = updatePresentationRecordProtocolState( recordId, PresentationRecord.ProtocolState.RequestPending, PresentationRecord.ProtocolState.RequestSent ) - override def markProposePresentationSent(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + override def markProposePresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] = updatePresentationRecordProtocolState( recordId, PresentationRecord.ProtocolState.ProposalPending, PresentationRecord.ProtocolState.ProposalSent ) - override def markPresentationVerified(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + override def markPresentationVerified(recordId: DidCommID): IO[PresentationError, PresentationRecord] = updatePresentationRecordProtocolState( recordId, PresentationRecord.ProtocolState.PresentationReceived, PresentationRecord.ProtocolState.PresentationVerified ) - override def markPresentationAccepted(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + override def markPresentationAccepted(recordId: DidCommID): IO[PresentationError, PresentationRecord] = updatePresentationRecordProtocolState( recordId, PresentationRecord.ProtocolState.PresentationVerified, PresentationRecord.ProtocolState.PresentationAccepted ) - override def markPresentationSent(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + override def markPresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] = updatePresentationRecordProtocolState( recordId, PresentationRecord.ProtocolState.PresentationGenerated, PresentationRecord.ProtocolState.PresentationSent ) - override def markPresentationRejected(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + override def markPresentationRejected(recordId: DidCommID): IO[PresentationError, PresentationRecord] = updatePresentationRecordProtocolState( recordId, - PresentationRecord.ProtocolState.PresentationReceived, + PresentationRecord.ProtocolState.PresentationVerified, PresentationRecord.ProtocolState.PresentationRejected ) - override def markRequestPresentationRejected(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + override def markRequestPresentationRejected(recordId: DidCommID): IO[PresentationError, PresentationRecord] = updatePresentationRecordProtocolState( recordId, PresentationRecord.ProtocolState.RequestReceived, @@ -466,7 +483,7 @@ private class PresentationServiceImpl( override def markPresentationVerificationFailed( recordId: DidCommID - ): IO[PresentationError, Option[PresentationRecord]] = + ): IO[PresentationError, PresentationRecord] = updatePresentationRecordProtocolState( recordId, PresentationRecord.ProtocolState.PresentationReceived, @@ -550,7 +567,7 @@ private class PresentationServiceImpl( id: DidCommID, from: PresentationRecord.ProtocolState, to: PresentationRecord.ProtocolState - ): IO[PresentationError, Option[PresentationRecord]] = { + ): IO[PresentationError, PresentationRecord] = { for { _ <- presentationRepository .updatePresentationRecordProtocolState(id, from, to) @@ -562,6 +579,10 @@ private class PresentationServiceImpl( record <- presentationRepository .getPresentationRecord(id) .mapError(RepositoryError.apply) + .flatMap { + case None => ZIO.fail(RecordIdNotFound(id)) + case Some(value) => ZIO.succeed(value) + } } yield record } diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifier.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifier.scala new file mode 100644 index 0000000000..535418c03d --- /dev/null +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifier.scala @@ -0,0 +1,144 @@ +package io.iohk.atala.pollux.core.service +import io.iohk.atala.event.notification.{Event, EventNotificationService} +import io.iohk.atala.mercury.model.DidId +import io.iohk.atala.mercury.protocol.presentproof.{Presentation, ProofType, ProposePresentation, RequestPresentation} +import io.iohk.atala.pollux.core.model.error.PresentationError +import io.iohk.atala.pollux.core.model.presentation.Options +import io.iohk.atala.pollux.core.model.{DidCommID, PresentationRecord} +import io.iohk.atala.pollux.vc.jwt.{Issuer, PresentationPayload, W3cCredentialPayload} +import zio.{IO, URLayer, ZIO, ZLayer} + +import java.time.Instant +import java.util.UUID + +class PresentationServiceNotifier( + svc: PresentationService, + eventNotificationService: EventNotificationService +) extends PresentationService { + + private val presentationUpdatedEvent = "PresentationUpdated" + + override def createPresentationRecord( + pairwiseVerifierDID: DidId, + pairwiseProverDID: DidId, + thid: DidCommID, + connectionId: Option[String], + proofTypes: Seq[ProofType], + options: Option[Options] + ): IO[PresentationError, PresentationRecord] = + notifyOnSuccess( + svc.createPresentationRecord(pairwiseVerifierDID, pairwiseProverDID, thid, connectionId, proofTypes, options) + ) + + override def markRequestPresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.markRequestPresentationSent(recordId)) + + override def receiveRequestPresentation( + connectionId: Option[String], + request: RequestPresentation + ): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.receiveRequestPresentation(connectionId, request)) + + override def markRequestPresentationRejected( + recordId: DidCommID + ): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.markRequestPresentationRejected(recordId)) + + override def acceptRequestPresentation( + recordId: DidCommID, + credentialsToUse: Seq[String] + ): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.acceptRequestPresentation(recordId, credentialsToUse)) + + override def rejectRequestPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.rejectRequestPresentation(recordId)) + + override def markPresentationGenerated( + recordId: DidCommID, + presentation: Presentation + ): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.markPresentationGenerated(recordId, presentation)) + + override def markPresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.markPresentationSent(recordId)) + + override def receivePresentation(presentation: Presentation): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.receivePresentation(presentation)) + + override def markPresentationVerified(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.markPresentationVerified(recordId)) + + override def markPresentationVerificationFailed( + recordId: DidCommID + ): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.markPresentationVerificationFailed(recordId)) + + override def acceptPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.acceptPresentation(recordId)) + + override def rejectPresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + notifyOnSuccess(svc.rejectPresentation(recordId)) + + private[this] def notifyOnSuccess(effect: IO[PresentationError, PresentationRecord]) = + for { + record <- effect + _ <- notify(record) + } yield record + + private[this] def notify(record: PresentationRecord) = { + val result = for { + producer <- eventNotificationService.producer[PresentationRecord]("Presentation") + _ <- producer.send(Event(presentationUpdatedEvent, record)) + } yield () + result.catchAll(e => ZIO.logError(s"Notification service error: $e")) + } + + override def extractIdFromCredential(credential: W3cCredentialPayload): Option[UUID] = + svc.extractIdFromCredential(credential) + + override def getPresentationRecords(): IO[PresentationError, Seq[PresentationRecord]] = svc.getPresentationRecords() + + override def createPresentationPayloadFromRecord( + record: DidCommID, + issuer: Issuer, + issuanceDate: Instant + ): IO[PresentationError, PresentationPayload] = svc.createPresentationPayloadFromRecord(record, issuer, issuanceDate) + + override def getPresentationRecordsByStates( + ignoreWithZeroRetries: Boolean, + limit: Int, + state: PresentationRecord.ProtocolState* + ): IO[PresentationError, Seq[PresentationRecord]] = + svc.getPresentationRecordsByStates(ignoreWithZeroRetries, limit, state: _*) + + override def getPresentationRecord(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + svc.getPresentationRecord(recordId) + + override def getPresentationRecordByThreadId(thid: DidCommID): IO[PresentationError, Option[PresentationRecord]] = + svc.getPresentationRecordByThreadId(thid) + + override def receiveProposePresentation(request: ProposePresentation): IO[PresentationError, PresentationRecord] = + svc.receiveProposePresentation((request)) + + override def acceptProposePresentation(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + svc.acceptPresentation(recordId) + + override def markProposePresentationSent(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + svc.markProposePresentationSent(recordId) + + override def markPresentationAccepted(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + svc.markPresentationAccepted(recordId) + + override def markPresentationRejected(recordId: DidCommID): IO[PresentationError, PresentationRecord] = + svc.markPresentationRejected(recordId) + + override def reportProcessingFailure( + recordId: DidCommID, + failReason: Option[_root_.java.lang.String] + ): IO[PresentationError, Unit] = svc.reportProcessingFailure(recordId, failReason) +} + +object PresentationServiceNotifier { + val layer: URLayer[EventNotificationService & PresentationService, PresentationService] = + ZLayer.fromFunction(PresentationServiceNotifier(_, _)) +} diff --git a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceImplSpec.scala b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceImplSpec.scala index ddbb82cabb..8ee6fe8c34 100644 --- a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceImplSpec.scala +++ b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceImplSpec.scala @@ -2,31 +2,21 @@ package io.iohk.atala.pollux.core.service import io.circe.Json import io.circe.syntax.* -import io.grpc.ManagedChannelBuilder import io.iohk.atala.castor.core.model.did.CanonicalPrismDID -import io.iohk.atala.iris.proto.service.IrisServiceGrpc -import io.iohk.atala.mercury.model.{AttachmentDescriptor, DidId, Message} +import io.iohk.atala.mercury.model.{DidId, Message} import io.iohk.atala.mercury.protocol.issuecredential.* import io.iohk.atala.pollux.core.model.* import io.iohk.atala.pollux.core.model.IssueCredentialRecord.* import io.iohk.atala.pollux.core.model.error.CredentialServiceError import io.iohk.atala.pollux.core.model.error.CredentialServiceError.* -import io.iohk.atala.pollux.core.model.presentation.{ClaimFormat, Ldp, Options, PresentationDefinition} -import io.iohk.atala.pollux.core.repository.CredentialRepositoryInMemory import io.iohk.atala.pollux.vc.jwt.* import zio.* import zio.test.* + import java.nio.charset.StandardCharsets import java.util.{Base64, UUID} -object CredentialServiceImplSpec extends ZIOSpecDefault { - - val irisStubLayer = ZLayer.fromZIO( - ZIO.succeed(IrisServiceGrpc.stub(ManagedChannelBuilder.forAddress("localhost", 9999).usePlaintext.build)) - ) - val didResolverLayer = ZLayer.fromZIO(ZIO.succeed(makeResolver(Map.empty))) - val credentialServiceLayer = - irisStubLayer ++ CredentialRepositoryInMemory.layer ++ didResolverLayer ++ ResourceURIDereferencerImpl.layer >>> CredentialServiceImpl.layer +object CredentialServiceImplSpec extends ZIOSpecDefault with CredentialServiceSpecHelper { override def spec = { suite("CredentialServiceImpl")( @@ -527,92 +517,4 @@ object CredentialServiceImplSpec extends ZIOSpecDefault { ).provideLayer(credentialServiceLayer) } - private[this] def offerCredential( - thid: Option[UUID] = Some(UUID.randomUUID()) - ) = OfferCredential( - from = DidId("did:prism:issuer"), - to = DidId("did:prism:holder"), - thid = thid.map(_.toString), - attachments = Seq( - AttachmentDescriptor.buildJsonAttachment( - payload = CredentialOfferAttachment( - Options(UUID.randomUUID().toString(), "my-domain"), - PresentationDefinition(format = Some(ClaimFormat(ldp = Some(Ldp(Seq("EcdsaSecp256k1Signature2019")))))) - ) - ) - ), - body = OfferCredential.Body( - goal_code = Some("Offer Credential"), - credential_preview = CredentialPreview(attributes = Seq(Attribute("name", "Alice"))) - ) - ) - - private[this] def requestCredential(thid: Option[DidCommID] = Some(DidCommID())) = RequestCredential( - from = DidId("did:prism:holder"), - to = DidId("did:prism:issuer"), - thid = thid.map(_.toString), - attachments = Nil, - body = RequestCredential.Body() - ) - - private[this] def issueCredential(thid: Option[DidCommID] = Some(DidCommID())) = IssueCredential( - from = DidId("did:prism:issuer"), - to = DidId("did:prism:holder"), - thid = thid.map(_.toString), - attachments = Nil, - body = IssueCredential.Body() - ) - - private[this] def makeResolver(lookup: Map[String, DIDDocument]): DidResolver = (didUrl: String) => { - lookup - .get(didUrl) - .fold( - ZIO.succeed(DIDResolutionFailed(NotFound(s"DIDDocument not found for $didUrl"))) - )((didDocument: DIDDocument) => { - ZIO.succeed( - DIDResolutionSucceeded( - didDocument, - DIDDocumentMetadata() - ) - ) - }) - } - - val defaultClaims = io.circe.parser - .parse(""" - |{ - | "name":"Alice", - | "address": { - | "street": "Street Name", - | "number": "12" - | } - |} - |""".stripMargin) - .getOrElse(Json.Null) - - extension (svc: CredentialService) - def createRecord( - pairwiseIssuerDID: DidId = DidId("did:prism:issuer"), - pairwiseHolderDID: DidId = DidId("did:prism:holder-pairwise"), - thid: DidCommID = DidCommID(), - schemaId: Option[String] = None, - claims: Json = defaultClaims, - validityPeriod: Option[Double] = None, - automaticIssuance: Option[Boolean] = None, - awaitConfirmation: Option[Boolean] = None, - issuingDID: Option[CanonicalPrismDID] = None - ) = { - svc.createIssueCredentialRecord( - pairwiseIssuerDID = pairwiseIssuerDID, - pairwiseHolderDID = pairwiseHolderDID, - thid = thid, - maybeSchemaId = schemaId, - claims = claims, - validityPeriod = validityPeriod, - automaticIssuance = automaticIssuance, - awaitConfirmation = awaitConfirmation, - issuingDID = issuingDID - ) - } - } diff --git a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifierSpec.scala b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifierSpec.scala new file mode 100644 index 0000000000..482538290d --- /dev/null +++ b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifierSpec.scala @@ -0,0 +1,145 @@ +package io.iohk.atala.pollux.core.service + +import io.iohk.atala.event.notification.{EventNotificationService, EventNotificationServiceImpl} +import io.iohk.atala.mercury.protocol.issuecredential.* +import io.iohk.atala.pollux.core.model.* +import io.iohk.atala.pollux.core.model.IssueCredentialRecord.ProtocolState +import io.iohk.atala.pollux.core.model.error.CredentialServiceError +import io.iohk.atala.pollux.vc.jwt.JWT +import zio.* +import zio.mock.Expectation +import zio.test.{Assertion, *} + +import java.time.Instant + +object CredentialServiceNotifierSpec extends ZIOSpecDefault with CredentialServiceSpecHelper { + + private val issueCredentialRecord = IssueCredentialRecord( + DidCommID(), + Instant.now, + None, + DidCommID(), + None, + IssueCredentialRecord.Role.Issuer, + None, + None, + None, + None, + ProtocolState.OfferPending, + None, + None, + None, + None, + None, + None, + 5, + None, + None + ) + + private val issuerExpectations = + MockCredentialService.CreateIssueCredentialRecord( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord) + ) ++ + MockCredentialService.MarkOfferSent( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.OfferSent)) + ) ++ + MockCredentialService.ReceiveCredentialRequest( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.RequestReceived)) + ) ++ + MockCredentialService.AcceptCredentialRequest( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.CredentialPending)) + ) ++ + MockCredentialService.MarkCredentialGenerated( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.CredentialGenerated)) + ) ++ + MockCredentialService.MarkCredentialSent( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.CredentialSent)) + ) + + private val holderExpectations = + MockCredentialService.ReceiveCredentialOffer( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.OfferReceived)) + ) ++ MockCredentialService.AcceptCredentialOffer( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.RequestPending)) + ) ++ + MockCredentialService.GenerateCredentialRequest( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.RequestGenerated)) + ) ++ + MockCredentialService.MarkRequestSent( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.RequestSent)) + ) ++ + MockCredentialService.ReceiveCredentialIssue( + assertion = Assertion.anything, + result = Expectation.value(issueCredentialRecord.copy(protocolState = ProtocolState.CredentialReceived)) + ) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + suite("CredentialServiceWithEventNotificationImpl")( + test("Happy flow generates relevant events on issuer side") { + for { + svc <- ZIO.service[CredentialService] + ens <- ZIO.service[EventNotificationService] + + offerCreatedRecord <- svc.createRecord() + issuerRecordId = offerCreatedRecord.id + _ <- svc.markOfferSent(issuerRecordId) + _ <- svc.receiveCredentialRequest(requestCredential()) + _ <- svc.acceptCredentialRequest(issuerRecordId) + _ <- svc.markCredentialGenerated(issuerRecordId, issueCredential()) + _ <- svc.markCredentialSent(issuerRecordId) + consumer <- ens.consumer[IssueCredentialRecord]("Issue") + events <- consumer.poll(50) + } yield { + assertTrue(events.size == 6) && + assertTrue(events.head.data.protocolState == ProtocolState.OfferPending) && + assertTrue(events(1).data.protocolState == ProtocolState.OfferSent) && + assertTrue(events(2).data.protocolState == ProtocolState.RequestReceived) && + assertTrue(events(3).data.protocolState == ProtocolState.CredentialPending) && + assertTrue(events(4).data.protocolState == ProtocolState.CredentialGenerated) && + assertTrue(events(5).data.protocolState == ProtocolState.CredentialSent) + } + }.provide( + ZLayer.succeed(50) >>> EventNotificationServiceImpl.layer, + issuerExpectations.toLayer >>> CredentialServiceNotifier.layer + ), + test("Happy flow generates relevant events on the holder side") { + for { + svc <- ZIO.service[CredentialService] + ens <- ZIO.service[EventNotificationService] + + offerReceivedRecord <- svc.receiveCredentialOffer(offerCredential()) + holderRecordId = offerReceivedRecord.id + subjectId = "did:prism:60821d6833158c93fde5bb6a40d69996a683bf1fa5cdf32c458395b2887597c3" + _ <- svc.acceptCredentialOffer(holderRecordId, subjectId) + _ <- svc.generateCredentialRequest(offerReceivedRecord.id, JWT("Fake JWT")) + _ <- svc.markRequestSent(holderRecordId) + _ <- svc.receiveCredentialIssue(issueCredential()) + consumer <- ens.consumer[IssueCredentialRecord]("Issue") + events <- consumer.poll(50) + } yield { + assertTrue(events.size == 5) && + assertTrue(events.head.data.protocolState == ProtocolState.OfferReceived) && + assertTrue(events(1).data.protocolState == ProtocolState.RequestPending) && + assertTrue(events(2).data.protocolState == ProtocolState.RequestGenerated) && + assertTrue(events(3).data.protocolState == ProtocolState.RequestSent) && + assertTrue(events(4).data.protocolState == ProtocolState.CredentialReceived) + } + }.provide( + ZLayer.succeed(50) >>> EventNotificationServiceImpl.layer, + holderExpectations.toLayer >>> CredentialServiceNotifier.layer + ) + ) + } + +} diff --git a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceSpecHelper.scala b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceSpecHelper.scala new file mode 100644 index 0000000000..a4530d9394 --- /dev/null +++ b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/CredentialServiceSpecHelper.scala @@ -0,0 +1,116 @@ +package io.iohk.atala.pollux.core.service + +import io.circe.Json +import io.grpc.ManagedChannelBuilder +import io.iohk.atala.castor.core.model.did.CanonicalPrismDID +import io.iohk.atala.iris.proto.service.IrisServiceGrpc +import io.iohk.atala.mercury.model.{AttachmentDescriptor, DidId} +import io.iohk.atala.mercury.protocol.issuecredential.* +import io.iohk.atala.pollux.core.model.* +import io.iohk.atala.pollux.core.model.error.CredentialServiceError +import io.iohk.atala.pollux.core.model.error.CredentialServiceError.* +import io.iohk.atala.pollux.core.model.presentation.{ClaimFormat, Ldp, Options, PresentationDefinition} +import io.iohk.atala.pollux.core.repository.CredentialRepositoryInMemory +import io.iohk.atala.pollux.vc.jwt.* +import zio.* + +import java.util.UUID + +trait CredentialServiceSpecHelper { + + protected val irisStubLayer = ZLayer.fromZIO( + ZIO.succeed(IrisServiceGrpc.stub(ManagedChannelBuilder.forAddress("localhost", 9999).usePlaintext.build)) + ) + protected val didResolverLayer = ZLayer.fromZIO(ZIO.succeed(makeResolver(Map.empty))) + protected val credentialServiceLayer = + irisStubLayer ++ CredentialRepositoryInMemory.layer ++ didResolverLayer ++ ResourceURIDereferencerImpl.layer >>> CredentialServiceImpl.layer + + protected def offerCredential( + thid: Option[UUID] = Some(UUID.randomUUID()) + ) = OfferCredential( + from = DidId("did:prism:issuer"), + to = DidId("did:prism:holder"), + thid = thid.map(_.toString), + attachments = Seq( + AttachmentDescriptor.buildJsonAttachment( + payload = CredentialOfferAttachment( + Options(UUID.randomUUID().toString(), "my-domain"), + PresentationDefinition(format = Some(ClaimFormat(ldp = Some(Ldp(Seq("EcdsaSecp256k1Signature2019")))))) + ) + ) + ), + body = OfferCredential.Body( + goal_code = Some("Offer Credential"), + credential_preview = CredentialPreview(attributes = Seq(Attribute("name", "Alice"))) + ) + ) + + protected def requestCredential(thid: Option[DidCommID] = Some(DidCommID())) = RequestCredential( + from = DidId("did:prism:holder"), + to = DidId("did:prism:issuer"), + thid = thid.map(_.toString), + attachments = Nil, + body = RequestCredential.Body() + ) + + protected def issueCredential(thid: Option[DidCommID] = Some(DidCommID())) = IssueCredential( + from = DidId("did:prism:issuer"), + to = DidId("did:prism:holder"), + thid = thid.map(_.toString), + attachments = Nil, + body = IssueCredential.Body() + ) + + protected def makeResolver(lookup: Map[String, DIDDocument]): DidResolver = (didUrl: String) => { + lookup + .get(didUrl) + .fold( + ZIO.succeed(DIDResolutionFailed(NotFound(s"DIDDocument not found for $didUrl"))) + )((didDocument: DIDDocument) => { + ZIO.succeed( + DIDResolutionSucceeded( + didDocument, + DIDDocumentMetadata() + ) + ) + }) + } + + val defaultClaims = io.circe.parser + .parse(""" + |{ + | "name":"Alice", + | "address": { + | "street": "Street Name", + | "number": "12" + | } + |} + |""".stripMargin) + .getOrElse(Json.Null) + + extension (svc: CredentialService) + def createRecord( + pairwiseIssuerDID: DidId = DidId("did:prism:issuer"), + pairwiseHolderDID: DidId = DidId("did:prism:holder-pairwise"), + thid: DidCommID = DidCommID(), + schemaId: Option[String] = None, + claims: Json = defaultClaims, + validityPeriod: Option[Double] = None, + automaticIssuance: Option[Boolean] = None, + awaitConfirmation: Option[Boolean] = None, + issuingDID: Option[CanonicalPrismDID] = None + ) = { + svc.createIssueCredentialRecord( + pairwiseIssuerDID = pairwiseIssuerDID, + pairwiseHolderDID = pairwiseHolderDID, + thid = thid, + maybeSchemaId = schemaId, + claims = claims, + validityPeriod = validityPeriod, + automaticIssuance = automaticIssuance, + awaitConfirmation = awaitConfirmation, + issuingDID = issuingDID + ) + } + +} diff --git a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifierSpec.scala b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifierSpec.scala new file mode 100644 index 0000000000..5725d8a04b --- /dev/null +++ b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifierSpec.scala @@ -0,0 +1,195 @@ +package io.iohk.atala.pollux.core.service + +import io.iohk.atala.event.notification.{EventNotificationService, EventNotificationServiceImpl} +import io.iohk.atala.mercury.model.DidId +import io.iohk.atala.mercury.protocol.presentproof.{Presentation, RequestPresentation} +import io.iohk.atala.pollux.core.model.PresentationRecord.ProtocolState +import io.iohk.atala.pollux.core.model.{DidCommID, PresentationRecord} +import zio.mock.Expectation +import zio.test.{Assertion, Spec, TestEnvironment, ZIOSpecDefault, assertTrue} +import zio.{Scope, ZIO, ZLayer} + +import java.time.Instant + +object PresentationServiceNotifierSpec extends ZIOSpecDefault with PresentationServiceSpecHelper { + + private val record = PresentationRecord( + DidCommID(""), + Instant.now(), + None, + DidCommID(""), + None, + None, + PresentationRecord.Role.Verifier, + DidId(""), + ProtocolState.RequestPending, + None, + None, + None, + None, + 5, + None, + None + ) + + private val verifierHappyFlowExpectations = + MockPresentationService.CreatePresentationRecord( + assertion = Assertion.anything, + result = Expectation.value(record) + ) ++ + MockPresentationService.MarkRequestPresentationSent( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.PresentationSent)) + ) ++ + MockPresentationService.ReceivePresentation( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.PresentationReceived)) + ) ++ + MockPresentationService.MarkPresentationVerified( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.PresentationVerified)) + ) ++ + MockPresentationService.AcceptPresentation( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.PresentationAccepted)) + ) + + private val verifierPresentationVerificationFailedExpectations = + MockPresentationService.MarkPresentationVerificationFailed( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.PresentationVerificationFailed)) + ) + + private val verifierRejectPresentationExpectations = + MockPresentationService.RejectPresentation( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.PresentationRejected)) + ) + + private val proverHappyFlowExpectations = + MockPresentationService.ReceiveRequestPresentation( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.RequestReceived)) + ) ++ + MockPresentationService.AcceptRequestPresentation( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.PresentationAccepted)) + ) ++ + MockPresentationService.MarkPresentationGenerated( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.PresentationGenerated)) + ) ++ + MockPresentationService.MarkPresentationSent( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.PresentationSent)) + ) + + private val proverRejectPresentationRequestExpectations = + MockPresentationService.RejectRequestPresentation( + assertion = Assertion.anything, + result = Expectation.value(record.copy(protocolState = ProtocolState.RequestRejected)) + ) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("PresentationServiceWithEventNotificationImpl")( + test("Happy flow generates relevant events on the verifier side") { + for { + svc <- ZIO.service[PresentationService] + ens <- ZIO.service[EventNotificationService] + + record <- svc.createPresentationRecord(DidId(""), DidId(""), DidCommID(""), None, Seq.empty, None) + _ <- svc.markRequestPresentationSent(record.id) + _ <- svc.receivePresentation(presentation(record.thid.value)) + _ <- svc.markPresentationVerified(record.id) + _ <- svc.acceptPresentation(record.id) + + consumer <- ens.consumer[PresentationRecord]("Presentation") + events <- consumer.poll(50) + } yield { + assertTrue(events.size == 5) && + assertTrue(events.head.data.protocolState == ProtocolState.RequestPending) && + assertTrue(events(1).data.protocolState == ProtocolState.RequestSent) + assertTrue(events(2).data.protocolState == ProtocolState.PresentationReceived) && + assertTrue(events(3).data.protocolState == ProtocolState.PresentationVerified) && + assertTrue(events(4).data.protocolState == ProtocolState.PresentationAccepted) + } + }.provide( + ZLayer.succeed(50) >>> EventNotificationServiceImpl.layer, + verifierHappyFlowExpectations.toLayer >>> PresentationServiceNotifier.layer + ), + test("Generates relevant events on presentation verification failed") { + for { + svc <- ZIO.service[PresentationService] + ens <- ZIO.service[EventNotificationService] + + _ <- svc.markPresentationVerificationFailed(DidCommID()) + + consumer <- ens.consumer[PresentationRecord]("Presentation") + events <- consumer.poll(50) + } yield { + assertTrue(events.size == 1) && + assertTrue(events.head.data.protocolState == ProtocolState.PresentationVerificationFailed) + } + }.provide( + ZLayer.succeed(50) >>> EventNotificationServiceImpl.layer, + verifierPresentationVerificationFailedExpectations.toLayer >>> PresentationServiceNotifier.layer + ), + test("Generates relevant events on presentation rejected") { + for { + svc <- ZIO.service[PresentationService] + ens <- ZIO.service[EventNotificationService] + + _ <- svc.rejectPresentation(DidCommID()) + + consumer <- ens.consumer[PresentationRecord]("Presentation") + events <- consumer.poll(50) + } yield { + assertTrue(events.size == 1) && + assertTrue(events.head.data.protocolState == ProtocolState.PresentationRejected) + } + }.provide( + ZLayer.succeed(50) >>> EventNotificationServiceImpl.layer, + verifierRejectPresentationExpectations.toLayer >>> PresentationServiceNotifier.layer + ), + test("Happy flow generates relevant events on the prover side") { + for { + svc <- ZIO.service[PresentationService] + ens <- ZIO.service[EventNotificationService] + + _ <- svc.receiveRequestPresentation(None, requestPresentation) + _ <- svc.acceptRequestPresentation(record.id, Seq.empty) + _ <- svc.markPresentationGenerated(record.id, presentation(record.thid.value)) + _ <- svc.markPresentationSent(record.id) + + consumer <- ens.consumer[PresentationRecord]("Presentation") + events <- consumer.poll(50) + } yield { + assertTrue(events.size == 4) && + assertTrue(events.head.data.protocolState == ProtocolState.RequestReceived) && + assertTrue(events(1).data.protocolState == ProtocolState.PresentationPending) + assertTrue(events(2).data.protocolState == ProtocolState.PresentationGenerated) && + assertTrue(events(3).data.protocolState == ProtocolState.PresentationSent) + } + }.provide( + ZLayer.succeed(50) >>> EventNotificationServiceImpl.layer, + proverHappyFlowExpectations.toLayer >>> PresentationServiceNotifier.layer + ), + test("Happy flow generates relevant events on the prover side") { + for { + svc <- ZIO.service[PresentationService] + ens <- ZIO.service[EventNotificationService] + + _ <- svc.rejectRequestPresentation(record.id) + + consumer <- ens.consumer[PresentationRecord]("Presentation") + events <- consumer.poll(50) + } yield { + assertTrue(events.size == 1) && + assertTrue(events.head.data.protocolState == ProtocolState.RequestRejected) + } + }.provide( + ZLayer.succeed(50) >>> EventNotificationServiceImpl.layer, + proverRejectPresentationRequestExpectations.toLayer >>> PresentationServiceNotifier.layer + ) + ) +} diff --git a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceSpec.scala b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceSpec.scala index a809614d8b..dc4f905acb 100644 --- a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceSpec.scala +++ b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceSpec.scala @@ -1,44 +1,26 @@ package io.iohk.atala.pollux.core.service import io.circe.parser.decode -import io.circe.syntax._ -import io.iohk.atala.mercury.model.DidId -import io.iohk.atala.mercury.protocol.presentproof._ -import io.iohk.atala.pollux.core.model._ -import io.iohk.atala.pollux.core.model.IssueCredentialRecord._ -import io.iohk.atala.pollux.core.model.PresentationRecord._ +import io.circe.syntax.* +import io.iohk.atala.mercury.model.{AttachmentDescriptor, DidId} +import io.iohk.atala.mercury.protocol.issuecredential.IssueCredential +import io.iohk.atala.mercury.protocol.presentproof.* +import io.iohk.atala.pollux.core.model.* +import io.iohk.atala.pollux.core.model.IssueCredentialRecord.* +import io.iohk.atala.pollux.core.model.PresentationRecord.* import io.iohk.atala.pollux.core.model.error.PresentationError -import io.iohk.atala.pollux.core.model.error.PresentationError._ -import io.iohk.atala.pollux.core.repository.PresentationRepositoryInMemory -import io.iohk.atala.pollux.core.repository.PresentationRepository +import io.iohk.atala.pollux.core.model.error.PresentationError.* +import io.iohk.atala.pollux.core.model.presentation.Options +import io.iohk.atala.pollux.core.repository.{CredentialRepository, PresentationRepository} +import io.iohk.atala.pollux.vc.jwt.* import zio.* import zio.test.* -import java.util.UUID -import io.iohk.atala.mercury.model.AttachmentDescriptor -import io.iohk.atala.pollux.core.model.presentation.Options -import io.iohk.atala.pollux.vc.jwt._ -import io.iohk.atala.pollux.vc.jwt.JwtPresentationPayload -import io.iohk.atala.pollux.core.repository.CredentialRepositoryInMemory -import io.iohk.atala.pollux.core.repository.CredentialRepository -import io.iohk.atala.mercury.DidAgent -import com.nimbusds.jose.jwk.OctetKeyPair -import io.iohk.atala.mercury.PeerDID -import io.iohk.atala.mercury.AgentPeerService + import java.time.Instant -import io.iohk.atala.mercury.protocol.issuecredential.IssueCredential -import java.security.* -import com.nimbusds.jose.jwk.* -object PresentationServiceSpec extends ZIOSpecDefault { +object PresentationServiceSpec extends ZIOSpecDefault with PresentationServiceSpecHelper { type PresentationEnv = PresentationService with PresentationRepository[Task] with CredentialRepository[Task] - val peerDidAgentLayer = - AgentPeerService.makeLayer(PeerDID.makePeerDid(serviceEndpoint = Some("http://localhost:9099"))) - val presentationServiceLayer = - PresentationRepositoryInMemory.layer ++ CredentialRepositoryInMemory.layer ++ peerDidAgentLayer >>> PresentationServiceImpl.layer - val presentationEnvLayer = - PresentationRepositoryInMemory.layer ++ CredentialRepositoryInMemory.layer ++ presentationServiceLayer - def withEnv[E, A](zio: ZIO[PresentationEnv, E, A]): ZIO[Any, E, A] = zio.provideLayer(presentationEnvLayer) @@ -198,7 +180,7 @@ object PresentationServiceSpec extends ZIOSpecDefault { record <- svc.markRequestPresentationSent(record.id) } yield { - assertTrue(record.get.protocolState == PresentationRecord.ProtocolState.RequestSent) + assertTrue(record.protocolState == PresentationRecord.ProtocolState.RequestSent) } }, test("markRequestPresentationRejected returns updated PresentationRecord") { @@ -215,7 +197,7 @@ object PresentationServiceSpec extends ZIOSpecDefault { record <- svc.markRequestPresentationRejected(record.id) } yield { - assertTrue(record.get.protocolState == PresentationRecord.ProtocolState.RequestRejected) + assertTrue(record.protocolState == PresentationRecord.ProtocolState.RequestRejected) }) }, test("receiveRequestPresentation updates the RequestPresentation in PresentatinRecord") { @@ -271,9 +253,9 @@ object PresentationServiceSpec extends ZIOSpecDefault { updateRecord <- svc.acceptRequestPresentation(aRecord.id, credentialsToUse) } yield { - assertTrue(updateRecord.get.connectionId == connectionId) - assertTrue(updateRecord.get.requestPresentationData == Some(requestPresentation)) - assertTrue(updateRecord.get.credentialsToUse.contains(credentialsToUse)) + assertTrue(updateRecord.connectionId == connectionId) + assertTrue(updateRecord.requestPresentationData == Some(requestPresentation)) + assertTrue(updateRecord.credentialsToUse.contains(credentialsToUse)) } ) @@ -287,9 +269,9 @@ object PresentationServiceSpec extends ZIOSpecDefault { updateRecord <- svc.rejectRequestPresentation(aRecord.id) } yield { - assertTrue(updateRecord.get.connectionId == connectionId) - assertTrue(updateRecord.get.requestPresentationData == Some(requestPresentation)) - assertTrue(updateRecord.get.protocolState == PresentationRecord.ProtocolState.RequestRejected) + assertTrue(updateRecord.connectionId == connectionId) + assertTrue(updateRecord.requestPresentationData == Some(requestPresentation)) + assertTrue(updateRecord.protocolState == PresentationRecord.ProtocolState.RequestRejected) } ) }, @@ -307,7 +289,7 @@ object PresentationServiceSpec extends ZIOSpecDefault { record <- svc.markPresentationSent(record.id) } yield { - assertTrue(record.get.protocolState == PresentationRecord.ProtocolState.PresentationSent) + assertTrue(record.protocolState == PresentationRecord.ProtocolState.PresentationSent) }) }, test("receivePresentation updates the PresentatinRecord") { @@ -319,8 +301,8 @@ object PresentationServiceSpec extends ZIOSpecDefault { aRecordReceived <- svc.receivePresentation(p) } yield { - assertTrue(aRecordReceived.get.id == aRecord.id) - assertTrue(aRecordReceived.get.presentationData == Some(p)) + assertTrue(aRecordReceived.id == aRecord.id) + assertTrue(aRecordReceived.presentationData == Some(p)) } ) }, @@ -340,8 +322,8 @@ object PresentationServiceSpec extends ZIOSpecDefault { aRecordAccept <- svc.acceptPresentation(aRecord.id) } yield { - assertTrue(aRecordReceived.get.id == aRecord.id) - assertTrue(aRecordReceived.get.presentationData == Some(p)) + assertTrue(aRecordReceived.id == aRecord.id) + assertTrue(aRecordReceived.presentationData == Some(p)) } ) }, @@ -351,13 +333,18 @@ object PresentationServiceSpec extends ZIOSpecDefault { svc <- ZIO.service[PresentationService] aRecord <- svc.createRecord() p = presentation(aRecord.thid.value) - aRecordReceived <- svc.receivePresentation(p) - aRecordAccept <- svc.markPresentationRejected(aRecord.id) - + _ <- svc.receivePresentation(p) + repo <- ZIO.service[PresentationRepository[Task]] + _ <- repo.updatePresentationRecordProtocolState( + aRecord.id, + PresentationRecord.ProtocolState.PresentationReceived, + PresentationRecord.ProtocolState.PresentationVerified + ) + aRecordReject <- svc.markPresentationRejected(aRecord.id) } yield { - assertTrue(aRecordAccept.get.id == aRecord.id) - assertTrue(aRecordAccept.get.presentationData == Some(p)) - assertTrue(aRecordAccept.get.protocolState == PresentationRecord.ProtocolState.PresentationRejected) + assertTrue(aRecordReject.id == aRecord.id) + assertTrue(aRecordReject.presentationData == Some(p)) + assertTrue(aRecordReject.protocolState == PresentationRecord.ProtocolState.PresentationRejected) } ) }, @@ -368,12 +355,17 @@ object PresentationServiceSpec extends ZIOSpecDefault { aRecord <- svc.createRecord() p = presentation(aRecord.thid.value) aRecordReceived <- svc.receivePresentation(p) - aRecordAccept <- svc.rejectPresentation(aRecord.id) - + repo <- ZIO.service[PresentationRepository[Task]] + _ <- repo.updatePresentationRecordProtocolState( + aRecord.id, + PresentationRecord.ProtocolState.PresentationReceived, + PresentationRecord.ProtocolState.PresentationVerified + ) + aRecordReject <- svc.rejectPresentation(aRecord.id) } yield { - assertTrue(aRecordAccept.get.id == aRecord.id) - assertTrue(aRecordAccept.get.presentationData == Some(p)) - assertTrue(aRecordAccept.get.protocolState == PresentationRecord.ProtocolState.PresentationRejected) + assertTrue(aRecordReject.id == aRecord.id) + assertTrue(aRecordReject.presentationData == Some(p)) + assertTrue(aRecordReject.protocolState == PresentationRecord.ProtocolState.PresentationRejected) } ) }, @@ -392,7 +384,7 @@ object PresentationServiceSpec extends ZIOSpecDefault { record <- svc.markPresentationGenerated(record.id, p) } yield { - assertTrue(record.get.protocolState == PresentationRecord.ProtocolState.PresentationGenerated) + assertTrue(record.protocolState == PresentationRecord.ProtocolState.PresentationGenerated) }) }, test("markProposePresentationSent returns updated PresentationRecord") { @@ -409,7 +401,7 @@ object PresentationServiceSpec extends ZIOSpecDefault { record <- svc.markProposePresentationSent(record.id) } yield { - assertTrue(record.get.protocolState == PresentationRecord.ProtocolState.ProposalSent) + assertTrue(record.protocolState == PresentationRecord.ProtocolState.ProposalSent) }) }, test("receiveProposePresentation updates the PresentatinRecord") { @@ -421,8 +413,8 @@ object PresentationServiceSpec extends ZIOSpecDefault { aRecordReceived <- svc.receiveProposePresentation(p) } yield { - assertTrue(aRecordReceived.get.id == aRecord.id) - assertTrue(aRecordReceived.get.proposePresentationData == Some(p)) + assertTrue(aRecordReceived.id == aRecord.id) + assertTrue(aRecordReceived.proposePresentationData == Some(p)) } ) }, @@ -442,130 +434,12 @@ object PresentationServiceSpec extends ZIOSpecDefault { aRecordAccept <- svc.acceptProposePresentation(aRecord.id) } yield { - assertTrue(aRecordReceived.get.id == aRecord.id) - assertTrue(aRecordReceived.get.proposePresentationData == Some(p)) + assertTrue(aRecordReceived.id == aRecord.id) + assertTrue(aRecordReceived.proposePresentationData == Some(p)) } ) }, ).provideLayer(presentationServiceLayer) } - def createIssuer(did: DID) = { - val keyGen = KeyPairGenerator.getInstance("EC") - keyGen.initialize(Curve.P_256.toECParameterSpec) - val keyPair = keyGen.generateKeyPair() - val privateKey = keyPair.getPrivate - val publicKey = keyPair.getPublic - Issuer( - did = did, - signer = ES256Signer(privateKey), - publicKey = publicKey - ) - } - private def requestCredential = io.iohk.atala.mercury.protocol.issuecredential.RequestCredential( - from = DidId("did:prism:aaa"), - to = DidId("did:prism:bbb"), - thid = Some(UUID.randomUUID.toString), - body = - io.iohk.atala.mercury.protocol.issuecredential.RequestCredential.Body(goal_code = Some("credential issuance")), - attachments = Nil - ) - - private def requestPresentation: RequestPresentation = { - val body = RequestPresentation.Body(goal_code = Some("Presentation Request")) - val presentationAttachmentAsJson = """{ - "challenge": "1f44d55f-f161-4938-a659-f8026467f126", - "domain": "us.gov/DriverLicense", - "credential_manifest": {} - }""" - val prover = DidId("did:peer:Prover") - val verifier = DidId("did:peer:Verifier") - - val attachmentDescriptor = AttachmentDescriptor.buildJsonAttachment(payload = presentationAttachmentAsJson) - RequestPresentation( - body = body, - attachments = Seq(attachmentDescriptor), - to = prover, - from = verifier, - ) - } - - private def proposePresentation(thid: String): ProposePresentation = { - val body = ProposePresentation.Body(goal_code = Some("Propose Presentation")) - val presentationAttachmentAsJson = """{ - "id": "1f44d55f-f161-4938-a659-f8026467f126", - "subject": "subject", - "credential_definition": {} - }""" - val attachmentDescriptor = AttachmentDescriptor.buildJsonAttachment(payload = presentationAttachmentAsJson) - val prover = DidId("did:peer:Prover") - val verifier = DidId("did:peer:Verifier") - ProposePresentation( - body = body, - thid = Some(thid), - attachments = Seq(attachmentDescriptor), - to = verifier, - from = prover - ) - } - private def presentation(thid: String): Presentation = { - val body = Presentation.Body(goal_code = Some("Presentation")) - val presentationAttachmentAsJson = """{ - "id": "1f44d55f-f161-4938-a659-f8026467f126", - "subject": "subject", - "credential_definition": {} - }""" - val attachmentDescriptor = AttachmentDescriptor.buildJsonAttachment(payload = presentationAttachmentAsJson) - val prover = DidId("did:peer:Prover") - val verifier = DidId("did:peer:Verifier") - Presentation( - body = body, - thid = Some(thid), - attachments = Seq(attachmentDescriptor), - to = verifier, - from = prover - ) - } - private def issueCredentialRecord = IssueCredentialRecord( - id = DidCommID(), - createdAt = Instant.ofEpochSecond(Instant.now.getEpochSecond()), - updatedAt = None, - thid = DidCommID(), - schemaId = None, - role = IssueCredentialRecord.Role.Issuer, - subjectId = None, - validityPeriod = None, - automaticIssuance = None, - awaitConfirmation = None, - protocolState = IssueCredentialRecord.ProtocolState.OfferPending, - publicationState = None, - offerCredentialData = None, - requestCredentialData = None, - issueCredentialData = None, - issuedCredentialRaw = None, - issuingDID = None, - metaRetries = 5, - metaNextRetry = Some(Instant.now()), - metaLastFailure = None, - ) - - extension (svc: PresentationService) - def createRecord( - pairwiseVerifierDID: DidId = DidId("did:prism:issuer"), - pairwiseProverDID: DidId = DidId("did:prism:prover-pairwise"), - thid: DidCommID = DidCommID(), - schemaId: String = "schemaId", - connectionId: Option[String] = None, - ) = { - val proofType = ProofType(schemaId, None, None) - svc.createPresentationRecord( - thid = thid, - pairwiseVerifierDID = pairwiseVerifierDID, - pairwiseProverDID = pairwiseProverDID, - connectionId = Some("connectionId"), - proofTypes = Seq(proofType), - options = None, - ) - } - } diff --git a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceSpecHelper.scala b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceSpecHelper.scala new file mode 100644 index 0000000000..e5904ed580 --- /dev/null +++ b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/service/PresentationServiceSpecHelper.scala @@ -0,0 +1,153 @@ +package io.iohk.atala.pollux.core.service + +import com.nimbusds.jose.jwk.* +import io.iohk.atala.mercury.model.{AttachmentDescriptor, DidId} +import io.iohk.atala.mercury.protocol.presentproof.* +import io.iohk.atala.mercury.{AgentPeerService, DidAgent, PeerDID} +import io.iohk.atala.pollux.core.model.* +import io.iohk.atala.pollux.core.repository.{ + CredentialRepository, + CredentialRepositoryInMemory, + PresentationRepositoryInMemory +} +import io.iohk.atala.pollux.vc.jwt.* +import zio.* + +import java.security.* +import java.time.Instant +import java.util.UUID + +trait PresentationServiceSpecHelper { + + val peerDidAgentLayer = + AgentPeerService.makeLayer(PeerDID.makePeerDid(serviceEndpoint = Some("http://localhost:9099"))) + val presentationServiceLayer = + PresentationRepositoryInMemory.layer ++ CredentialRepositoryInMemory.layer ++ peerDidAgentLayer >>> PresentationServiceImpl.layer + val presentationEnvLayer = + PresentationRepositoryInMemory.layer ++ CredentialRepositoryInMemory.layer ++ presentationServiceLayer + + def createIssuer(did: DID) = { + val keyGen = KeyPairGenerator.getInstance("EC") + keyGen.initialize(Curve.P_256.toECParameterSpec) + val keyPair = keyGen.generateKeyPair() + val privateKey = keyPair.getPrivate + val publicKey = keyPair.getPublic + Issuer( + did = did, + signer = ES256Signer(privateKey), + publicKey = publicKey + ) + } + + protected def requestCredential = io.iohk.atala.mercury.protocol.issuecredential.RequestCredential( + from = DidId("did:prism:aaa"), + to = DidId("did:prism:bbb"), + thid = Some(UUID.randomUUID.toString), + body = + io.iohk.atala.mercury.protocol.issuecredential.RequestCredential.Body(goal_code = Some("credential issuance")), + attachments = Nil + ) + + protected def requestPresentation: RequestPresentation = { + val body = RequestPresentation.Body(goal_code = Some("Presentation Request")) + val presentationAttachmentAsJson = + """{ + "challenge": "1f44d55f-f161-4938-a659-f8026467f126", + "domain": "us.gov/DriverLicense", + "credential_manifest": {} + }""" + val prover = DidId("did:peer:Prover") + val verifier = DidId("did:peer:Verifier") + + val attachmentDescriptor = AttachmentDescriptor.buildJsonAttachment(payload = presentationAttachmentAsJson) + RequestPresentation( + body = body, + attachments = Seq(attachmentDescriptor), + to = prover, + from = verifier, + ) + } + + protected def proposePresentation(thid: String): ProposePresentation = { + val body = ProposePresentation.Body(goal_code = Some("Propose Presentation")) + val presentationAttachmentAsJson = + """{ + "id": "1f44d55f-f161-4938-a659-f8026467f126", + "subject": "subject", + "credential_definition": {} + }""" + val attachmentDescriptor = AttachmentDescriptor.buildJsonAttachment(payload = presentationAttachmentAsJson) + val prover = DidId("did:peer:Prover") + val verifier = DidId("did:peer:Verifier") + ProposePresentation( + body = body, + thid = Some(thid), + attachments = Seq(attachmentDescriptor), + to = verifier, + from = prover + ) + } + + protected def presentation(thid: String): Presentation = { + val body = Presentation.Body(goal_code = Some("Presentation")) + val presentationAttachmentAsJson = + """{ + "id": "1f44d55f-f161-4938-a659-f8026467f126", + "subject": "subject", + "credential_definition": {} + }""" + val attachmentDescriptor = AttachmentDescriptor.buildJsonAttachment(payload = presentationAttachmentAsJson) + val prover = DidId("did:peer:Prover") + val verifier = DidId("did:peer:Verifier") + Presentation( + body = body, + thid = Some(thid), + attachments = Seq(attachmentDescriptor), + to = verifier, + from = prover + ) + } + + protected def issueCredentialRecord = IssueCredentialRecord( + id = DidCommID(), + createdAt = Instant.ofEpochSecond(Instant.now.getEpochSecond()), + updatedAt = None, + thid = DidCommID(), + schemaId = None, + role = IssueCredentialRecord.Role.Issuer, + subjectId = None, + validityPeriod = None, + automaticIssuance = None, + awaitConfirmation = None, + protocolState = IssueCredentialRecord.ProtocolState.OfferPending, + publicationState = None, + offerCredentialData = None, + requestCredentialData = None, + issueCredentialData = None, + issuedCredentialRaw = None, + issuingDID = None, + metaRetries = 5, + metaNextRetry = Some(Instant.now()), + metaLastFailure = None, + ) + + extension (svc: PresentationService) + def createRecord( + pairwiseVerifierDID: DidId = DidId("did:prism:issuer"), + pairwiseProverDID: DidId = DidId("did:prism:prover-pairwise"), + thid: DidCommID = DidCommID(), + schemaId: String = "schemaId", + connectionId: Option[String] = None, + ) = { + val proofType = ProofType(schemaId, None, None) + svc.createPresentationRecord( + thid = thid, + pairwiseVerifierDID = pairwiseVerifierDID, + pairwiseProverDID = pairwiseProverDID, + connectionId = Some("connectionId"), + proofTypes = Seq(proofType), + options = None, + ) + } + +} diff --git a/prism-agent/service/api/http/prism-agent-openapi-spec.yaml b/prism-agent/service/api/http/prism-agent-openapi-spec.yaml index 2820aef82d..26fe57a603 100644 --- a/prism-agent/service/api/http/prism-agent-openapi-spec.yaml +++ b/prism-agent/service/api/http/prism-agent-openapi-spec.yaml @@ -1992,6 +1992,21 @@ components: type: string description: The current state of the issue credential protocol execution. example: OfferPending + enum: + - OfferPending + - OfferSent + - OfferReceived + - RequestPending + - RequestGenerated + - RequestSent + - RequestReceived + - CredentialPending + - CredentialGenerated + - CredentialSent + - CredentialReceived + - ProblemReportPending + - ProblemReportSent + - ProblemReportReceived jwtCredential: type: string description: The base64-encoded JWT verifiable credential that has been @@ -2140,6 +2155,7 @@ components: - PresentationSent - PresentationReceived - PresentationVerified + - PresentationVerificationFailed - PresentationAccepted - PresentationRejected - ProblemReportPending diff --git a/prism-agent/service/server/src/main/resources/application.conf b/prism-agent/service/server/src/main/resources/application.conf index 7f2522545e..7acd3804c8 100644 --- a/prism-agent/service/server/src/main/resources/application.conf +++ b/prism-agent/service/server/src/main/resources/application.conf @@ -2,78 +2,78 @@ devMode = false devMode = ${?DEV_MODE} iris { - service { - host = "localhost" - host = ${?IRIS_HOST} - port = 8081 - port = ${?IRIS_PORT} - } + service { + host = "localhost" + host = ${?IRIS_HOST} + port = 8081 + port = ${?IRIS_PORT} + } } prismNode { - service = { - host = "localhost" - host = ${?PRISM_NODE_HOST} - port = 50053 - port = ${?PRISM_NODE_PORT} - } + service = { + host = "localhost" + host = ${?PRISM_NODE_HOST} + port = 50053 + port = ${?PRISM_NODE_PORT} + } } castor { - database { - host = "localhost" - host = ${?CASTOR_DB_HOST} - port = 5432 - port = ${?CASTOR_DB_PORT} - databaseName = "castor" - databaseName = ${?CASTOR_DB_NAME} - username = "postgres" - username = ${?CASTOR_DB_USER} - password = "postgres" - password = ${?CASTOR_DB_PASSWORD} - awaitConnectionThreads = 8 - } + database { + host = "localhost" + host = ${?CASTOR_DB_HOST} + port = 5432 + port = ${?CASTOR_DB_PORT} + databaseName = "castor" + databaseName = ${?CASTOR_DB_NAME} + username = "postgres" + username = ${?CASTOR_DB_USER} + password = "postgres" + password = ${?CASTOR_DB_PASSWORD} + awaitConnectionThreads = 8 + } } pollux { - database { - host = "localhost" - host = ${?POLLUX_DB_HOST} - port = 5432 - port = ${?POLLUX_DB_PORT} - databaseName = "pollux" - databaseName = ${?POLLUX_DB_NAME} - username = "postgres" - username = ${?POLLUX_DB_USER} - password = "postgres" - password = ${?POLLUX_DB_PASSWORD} - awaitConnectionThreads = 8 - } - issueBgJobRecordsLimit = 25 - issueBgJobRecurrenceDelay = 2 seconds - issueBgJobProcessingParallelism = 5 - presentationBgJobRecordsLimit = 25 - presentationBgJobRecurrenceDelay = 2 seconds - presentationBgJobProcessingParallelism = 5 + database { + host = "localhost" + host = ${?POLLUX_DB_HOST} + port = 5432 + port = ${?POLLUX_DB_PORT} + databaseName = "pollux" + databaseName = ${?POLLUX_DB_NAME} + username = "postgres" + username = ${?POLLUX_DB_USER} + password = "postgres" + password = ${?POLLUX_DB_PASSWORD} + awaitConnectionThreads = 8 + } + issueBgJobRecordsLimit = 25 + issueBgJobRecurrenceDelay = 2 seconds + issueBgJobProcessingParallelism = 5 + presentationBgJobRecordsLimit = 25 + presentationBgJobRecurrenceDelay = 2 seconds + presentationBgJobProcessingParallelism = 5 } connect { - database { - host = "localhost" - host = ${?CONNECT_DB_HOST} - port = 5432 - port = ${?CONNECT_DB_PORT} - databaseName = "connect" - databaseName = ${?CONNECT_DB_NAME} - username = "postgres" - username = ${?CONNECT_DB_USER} - password = "postgres" - password = ${?CONNECT_DB_PASSWORD} - awaitConnectionThreads = 8 - } - connectBgJobRecordsLimit = 25 - connectBgJobRecurrenceDelay = 2 seconds - connectBgJobProcessingParallelism = 5 + database { + host = "localhost" + host = ${?CONNECT_DB_HOST} + port = 5432 + port = ${?CONNECT_DB_PORT} + databaseName = "connect" + databaseName = ${?CONNECT_DB_NAME} + username = "postgres" + username = ${?CONNECT_DB_USER} + password = "postgres" + password = ${?CONNECT_DB_PASSWORD} + awaitConnectionThreads = 8 + } + connectBgJobRecordsLimit = 25 + connectBgJobRecurrenceDelay = 2 seconds + connectBgJobProcessingParallelism = 5 } agent { @@ -135,4 +135,9 @@ agent { token = ${?VAULT_TOKEN} } } + webhookPublisher { + url = ${?WEBHOOK_URL} + apiKey = ${?WEBHOOK_API_KEY} + parallelism = ${?WEBHOOK_PARALLELISM} + } } diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/notification/JsonEventEncoders.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/notification/JsonEventEncoders.scala new file mode 100644 index 0000000000..01f962f480 --- /dev/null +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/notification/JsonEventEncoders.scala @@ -0,0 +1,38 @@ +package io.iohk.atala.agent.notification + +import io.iohk.atala.agent.walletapi.model.ManagedDIDDetail +import io.iohk.atala.castor.controller.http.ManagedDID +import io.iohk.atala.castor.controller.http.ManagedDID.* +import io.iohk.atala.connect.controller.http.Connection +import io.iohk.atala.connect.controller.http.Connection.* +import io.iohk.atala.connect.core.model.ConnectionRecord +import io.iohk.atala.event.notification.{Event, EventNotificationServiceError} +import io.iohk.atala.issue.controller.http.IssueCredentialRecord +import io.iohk.atala.issue.controller.http.IssueCredentialRecord.* +import io.iohk.atala.pollux.core.model.{ + IssueCredentialRecord as PolluxIssueCredentialRecord, + PresentationRecord as PolluxPresentationRecord +} +import io.iohk.atala.presentproof.controller.http.PresentationStatus +import zio.* +import zio.json.* + +object JsonEventEncoders { + + implicit val connectionRecordEncoder: JsonEncoder[ConnectionRecord] = + Connection.encoder.contramap(implicitly[Conversion[ConnectionRecord, Connection]].convert) + + implicit val issueCredentialRecordEncoder: JsonEncoder[PolluxIssueCredentialRecord] = + IssueCredentialRecord.encoder.contramap( + implicitly[Conversion[PolluxIssueCredentialRecord, IssueCredentialRecord]].convert + ) + + implicit val presentationRecordEncoder: JsonEncoder[PolluxPresentationRecord] = + PresentationStatus.encoder.contramap(implicitly[Conversion[PolluxPresentationRecord, PresentationStatus]].convert) + + implicit val managedDIDDetailEncoder: JsonEncoder[ManagedDIDDetail] = + ManagedDID.encoder.contramap(implicitly[Conversion[ManagedDIDDetail, ManagedDID]].convert) + + implicit def eventEncoder[T](implicit jsonEncoder: JsonEncoder[T]): JsonEncoder[Event[T]] = + DeriveJsonEncoder.gen[Event[T]] +} diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/notification/WebhookPublisher.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/notification/WebhookPublisher.scala new file mode 100644 index 0000000000..81fc65d7e3 --- /dev/null +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/notification/WebhookPublisher.scala @@ -0,0 +1,97 @@ +package io.iohk.atala.agent.notification +import io.iohk.atala.agent.notification.JsonEventEncoders.* +import io.iohk.atala.agent.notification.WebhookPublisherError.{InvalidWebhookURL, UnexpectedError} +import io.iohk.atala.agent.server.config.AppConfig +import io.iohk.atala.agent.walletapi.model.ManagedDIDDetail +import io.iohk.atala.connect.core.model.ConnectionRecord +import io.iohk.atala.event.notification.{Event, EventConsumer, EventNotificationService} +import io.iohk.atala.pollux.core.model.{IssueCredentialRecord, PresentationRecord} +import zio.* +import zio.http.* +import zio.http.model.* +import zio.json.* + +import java.net.URL + +class WebhookPublisher(appConfig: AppConfig, notificationService: EventNotificationService) { + + private val config = appConfig.agent.webhookPublisher + private val baseHeaders = + config.apiKey.map(key => Headers.authorization(key)).getOrElse(Headers.empty) ++ + Headers.contentType(HeaderValues.applicationJson) + + private val parallelism = config.parallelism match { + case Some(p) if p < 1 => 1 + case Some(p) if p > 10 => 10 + case Some(p) => p + case None => 1 + } + + val run: ZIO[Client, WebhookPublisherError, Unit] = config.url match { + case Some(url) => + for { + url <- ZIO.attempt(URL(url)).mapError(th => InvalidWebhookURL(s"$url [${th.getMessage}]")) + connectConsumer <- notificationService + .consumer[ConnectionRecord]("Connect") + .mapError(e => UnexpectedError(e.toString)) + issueConsumer <- notificationService + .consumer[IssueCredentialRecord]("Issue") + .mapError(e => UnexpectedError(e.toString)) + presentationConsumer <- notificationService + .consumer[PresentationRecord]("Presentation") + .mapError(e => UnexpectedError(e.toString)) + didStateConsumer <- notificationService + .consumer[ManagedDIDDetail]("DIDDetail") + .mapError(e => UnexpectedError(e.toString)) + _ <- pollAndNotify(connectConsumer, url).forever.debug.forkDaemon + _ <- pollAndNotify(issueConsumer, url).forever.debug.forkDaemon + _ <- pollAndNotify(presentationConsumer, url).forever.debug.forkDaemon + _ <- pollAndNotify(didStateConsumer, url).forever.debug.forkDaemon + } yield () + case None => ZIO.unit + } + + private[this] def pollAndNotify[A](consumer: EventConsumer[A], url: URL)(implicit encoder: JsonEncoder[A]) = { + for { + _ <- ZIO.log(s"Polling $parallelism event(s)") + events <- consumer.poll(parallelism).mapError(e => UnexpectedError(e.toString)) + _ <- ZIO.log(s"Got ${events.size} event(s)") + _ <- ZIO.foreachPar(events)(e => + notifyWebhook(e, url) + .retry(Schedule.spaced(5.second) && Schedule.recurs(2)) + .catchAll(e => ZIO.logError(s"Webhook permanently failing, with last error being: ${e.msg}")) + ) + } yield () + } + + private[this] def notifyWebhook[A](event: Event[A], url: URL)(implicit + encoder: JsonEncoder[A] + ): ZIO[Client, UnexpectedError, Unit] = { + for { + _ <- ZIO.log(s"Sending event: $event to HTTP webhook URL: $url.") + response <- Client + .request( + url = url.toString, + method = Method.POST, + headers = baseHeaders, + content = Body.fromString(event.toJson) + ) + .timeoutFail(new RuntimeException("Client request timed out"))(5.seconds) + .mapError(t => UnexpectedError(s"Webhook request error: $t")) + resp <- response match + case Response(status, _, _, _, _) if status.isSuccess => + ZIO.unit + case Response(status, _, _, _, maybeHttpError) => + ZIO.fail( + UnexpectedError( + s"Unsuccessful webhook response: [status: $status] [error: ${maybeHttpError.getOrElse("none")}]" + ) + ) + } yield resp + } +} + +object WebhookPublisher { + val layer: URLayer[AppConfig & EventNotificationService, WebhookPublisher] = + ZLayer.fromFunction(WebhookPublisher(_, _)) +} diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/notification/WebhookPublisherError.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/notification/WebhookPublisherError.scala new file mode 100644 index 0000000000..4d629c0684 --- /dev/null +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/notification/WebhookPublisherError.scala @@ -0,0 +1,8 @@ +package io.iohk.atala.agent.notification + +sealed trait WebhookPublisherError + +object WebhookPublisherError { + case class InvalidWebhookURL(msg: String) extends WebhookPublisherError + case class UnexpectedError(msg: String) extends WebhookPublisherError +} diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala index bd65c5004a..56c4e0cd14 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala @@ -3,14 +3,19 @@ package io.iohk.atala.agent.server import com.nimbusds.jose.crypto.bc.BouncyCastleProviderSingleton import io.iohk.atala.agent.server.http.ZioHttpClient import io.iohk.atala.agent.server.sql.Migrations as AgentMigrations -import io.iohk.atala.agent.walletapi.service.{ManagedDIDService, ManagedDIDServiceImpl} +import io.iohk.atala.agent.walletapi.service.{ + ManagedDIDService, + ManagedDIDServiceImpl, + ManagedDIDServiceWithEventNotificationImpl +} import io.iohk.atala.agent.walletapi.sql.JdbcDIDNonSecretStorage import io.iohk.atala.castor.controller.{DIDControllerImpl, DIDRegistrarControllerImpl} import io.iohk.atala.castor.core.service.DIDServiceImpl import io.iohk.atala.castor.core.util.DIDOperationValidator import io.iohk.atala.connect.controller.ConnectionControllerImpl -import io.iohk.atala.connect.core.service.ConnectionServiceImpl +import io.iohk.atala.connect.core.service.{ConnectionServiceImpl, ConnectionServiceNotifier} import io.iohk.atala.connect.sql.repository.{JdbcConnectionRepository, Migrations as ConnectMigrations} +import io.iohk.atala.event.notification.EventNotificationServiceImpl import io.iohk.atala.issue.controller.IssueControllerImpl import io.iohk.atala.mercury.* import io.iohk.atala.pollux.core.service.* @@ -31,6 +36,7 @@ import io.iohk.atala.resolvers.DIDResolver import io.iohk.atala.system.controller.SystemControllerImpl import io.micrometer.prometheus.{PrometheusConfig, PrometheusMeterRegistry} import zio.* +import zio.http.Client import zio.metrics.connectors.micrometer import zio.metrics.connectors.micrometer.MicrometerConfig import zio.metrics.jvm.DefaultJvmMetrics @@ -123,12 +129,12 @@ object MainApp extends ZIOAppDefault { DIDResolver.layer, HttpURIDereferencerImpl.layer, // service - ConnectionServiceImpl.layer, + ConnectionServiceImpl.layer >>> ConnectionServiceNotifier.layer, CredentialSchemaServiceImpl.layer, - CredentialServiceImpl.layer, + CredentialServiceImpl.layer >>> CredentialServiceNotifier.layer, DIDServiceImpl.layer, - ManagedDIDServiceImpl.layer, - PresentationServiceImpl.layer, + ManagedDIDServiceWithEventNotificationImpl.layer, + PresentationServiceImpl.layer >>> PresentationServiceNotifier.layer, VerificationPolicyServiceImpl.layer, // grpc GrpcModule.irisStubLayer, @@ -141,6 +147,11 @@ object MainApp extends ZIOAppDefault { RepoModule.polluxTransactorLayer >>> JdbcCredentialSchemaRepository.layer, RepoModule.polluxTransactorLayer >>> JdbcPresentationRepository.layer, RepoModule.polluxTransactorLayer >>> JdbcVerificationPolicyRepository.layer, + // event notification service + ZLayer.succeed(500) >>> EventNotificationServiceImpl.layer, + // HTTP client + Client.default, + Scope.default ) } yield app diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/PrismAgentApp.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/PrismAgentApp.scala index 86fc47d92f..dd195afe5a 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/PrismAgentApp.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/PrismAgentApp.scala @@ -1,13 +1,11 @@ package io.iohk.atala.agent.server +import io.iohk.atala.agent.notification.WebhookPublisher import io.iohk.atala.agent.server.config.AppConfig -import io.iohk.atala.agent.server.http.ZHttp4sBlazeServer -import io.iohk.atala.agent.server.http.ZHttpEndpoints -import io.iohk.atala.agent.server.jobs.BackgroundJobs -import io.iohk.atala.agent.server.jobs.ConnectBackgroundJobs +import io.iohk.atala.agent.server.http.{ZHttp4sBlazeServer, ZHttpEndpoints} +import io.iohk.atala.agent.server.jobs.{BackgroundJobs, ConnectBackgroundJobs} import io.iohk.atala.agent.walletapi.service.ManagedDIDService -import io.iohk.atala.castor.controller.DIDRegistrarServerEndpoints -import io.iohk.atala.castor.controller.DIDServerEndpoints +import io.iohk.atala.castor.controller.{DIDRegistrarServerEndpoints, DIDServerEndpoints} import io.iohk.atala.castor.core.service.DIDService import io.iohk.atala.connect.controller.ConnectionServerEndpoints import io.iohk.atala.connect.core.service.ConnectionService @@ -33,6 +31,7 @@ object PrismAgentApp { _ <- syncDIDPublicationStateFromDltJob.fork _ <- AgentHttpServer.run.fork fiber <- DidCommHttpServer.run(didCommServicePort).fork + _ <- WebhookPublisher.layer.build.map(_.get[WebhookPublisher]).flatMap(_.run.debug.fork) _ <- fiber.join *> ZIO.log(s"Server End") _ <- ZIO.never } yield () diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/config/AppConfig.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/config/AppConfig.scala index c20e9f84a2..2d44765e1b 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/config/AppConfig.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/config/AppConfig.scala @@ -90,12 +90,19 @@ final case class VerificationConfig(options: Options) { } } +final case class WebhookPublisherConfig( + url: Option[String], + apiKey: Option[String], + parallelism: Option[Int] +) + final case class AgentConfig( httpEndpoint: HttpEndpointConfig, didCommServiceEndpointUrl: String, database: DatabaseConfig, verification: VerificationConfig, - secretStorage: SecretStorageConfig + secretStorage: SecretStorageConfig, + webhookPublisher: WebhookPublisherConfig ) final case class HttpEndpointConfig(http: HttpConfig) diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/connect/controller/http/Connection.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/connect/controller/http/Connection.scala index 8a1a9d4252..c143e9dade 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/connect/controller/http/Connection.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/connect/controller/http/Connection.scala @@ -84,6 +84,8 @@ object Connection { kind = "Connection", ) + given Conversion[model.ConnectionRecord, Connection] = fromDomain + object annotations { object connectionId extends Annotation[UUID]( diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/issue/controller/http/IssueCredentialRecord.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/issue/controller/http/IssueCredentialRecord.scala index 317587db09..94aef744e8 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/issue/controller/http/IssueCredentialRecord.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/issue/controller/http/IssueCredentialRecord.scala @@ -116,6 +116,8 @@ object IssueCredentialRecord { }) ) + given Conversion[PolluxIssueCredentialRecord, IssueCredentialRecord] = fromDomain + object annotations { object subjectId diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/presentproof/controller/PresentProofControllerImpl.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/presentproof/controller/PresentProofControllerImpl.scala index 9515536805..3c1c2849ef 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/presentproof/controller/PresentProofControllerImpl.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/presentproof/controller/PresentProofControllerImpl.scala @@ -84,11 +84,11 @@ class PresentProofControllerImpl( ): IO[ErrorResponse, PresentationStatus] = { val result: ZIO[Any, ErrorResponse | PresentationError, PresentationStatus] = for { didCommId <- ZIO.succeed(DidCommID(id.toString)) - maybeRecord <- requestPresentationAction.action match { + record <- requestPresentationAction.action match { case "request-accept" => presentationService.acceptRequestPresentation( recordId = didCommId, - crecentialsToUse = requestPresentationAction.proofId.getOrElse(Seq.empty) + credentialsToUse = requestPresentationAction.proofId.getOrElse(Seq.empty) ) case "request-reject" => presentationService.rejectRequestPresentation(didCommId) case "presentation-accept" => presentationService.acceptPresentation(didCommId) @@ -102,10 +102,6 @@ class PresentProofControllerImpl( ) ) } - record <- ZIO - .fromOption(maybeRecord) - .mapError(_ => ErrorResponse.notFound(detail = Some(s"Presentation record not found: $id"))) - } yield PresentationStatus.fromDomain(record) result.mapError { diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/presentproof/controller/http/PresentationStatus.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/presentproof/controller/http/PresentationStatus.scala index f7300386e1..ccec1c1fd9 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/presentproof/controller/http/PresentationStatus.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/presentproof/controller/http/PresentationStatus.scala @@ -52,6 +52,8 @@ object PresentationStatus { ) } + given Conversion[PresentationRecord, PresentationStatus] = fromDomain + object annotations { object presentationId extends Annotation[String]( diff --git a/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/service/ManagedDIDServiceImpl.scala b/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/service/ManagedDIDServiceImpl.scala index a008e55d16..92f3241f0a 100644 --- a/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/service/ManagedDIDServiceImpl.scala +++ b/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/service/ManagedDIDServiceImpl.scala @@ -4,7 +4,7 @@ import io.iohk.atala.agent.walletapi.crypto.Apollo import io.iohk.atala.agent.walletapi.model.* import io.iohk.atala.agent.walletapi.model.error.{*, given} import io.iohk.atala.agent.walletapi.service.ManagedDIDService.DEFAULT_MASTER_KEY_ID -import io.iohk.atala.agent.walletapi.service.handler.{DIDUpdateHandler, PublicationHandler} +import io.iohk.atala.agent.walletapi.service.handler.{DIDCreateHandler, DIDUpdateHandler, PublicationHandler} import io.iohk.atala.agent.walletapi.storage.{DIDNonSecretStorage, DIDSecretStorage} import io.iohk.atala.agent.walletapi.util.* import io.iohk.atala.castor.core.model.did.* @@ -17,12 +17,12 @@ import zio.* import scala.language.implicitConversions import java.security.{PrivateKey as JavaPrivateKey, PublicKey as JavaPublicKey} import scala.collection.immutable.ArraySeq -import io.iohk.atala.agent.walletapi.service.handler.DIDCreateHandler +import scala.language.implicitConversions /** A wrapper around Castor's DIDService providing key-management capability. Analogous to the secretAPI in * indy-wallet-sdk. */ -final class ManagedDIDServiceImpl private[walletapi] ( +class ManagedDIDServiceImpl private[walletapi] ( didService: DIDService, didOpValidator: DIDOperationValidator, private[walletapi] val secretStorage: DIDSecretStorage, @@ -263,28 +263,28 @@ final class ManagedDIDServiceImpl private[walletapi] ( } yield awaitingConfirmationOps ++ pendingOps } - private def computeNewDIDLineageStatusAndPersist[E]( + protected def computeNewDIDLineageStatusAndPersist[E]( updateLineage: DIDUpdateLineage - )(using c1: Conversion[DIDOperationError, E], c2: Conversion[CommonWalletStorageError, E]): IO[E, Unit] = { + )(using c1: Conversion[DIDOperationError, E], c2: Conversion[CommonWalletStorageError, E]): IO[E, Boolean] = { for { maybeOperationDetail <- didService .getScheduledDIDOperationDetail(updateLineage.operationId.toArray) .mapError[E](e => e) newStatus = maybeOperationDetail.fold(ScheduledDIDOperationStatus.Rejected)(_.status) - _ <- nonSecretStorage + updated <- nonSecretStorage .setDIDUpdateLineageStatus(updateLineage.operationId.toArray, newStatus) .mapError[E](CommonWalletStorageError.apply) .when(updateLineage.status != newStatus) - } yield () + } yield updated.isDefined } /** Reconcile state with DLT and write new state to the storage */ - private def computeNewDIDStateFromDLTAndPersist[E]( + protected def computeNewDIDStateFromDLTAndPersist[E]( did: CanonicalPrismDID )(using c1: Conversion[CommonWalletStorageError, E], c2: Conversion[DIDOperationError, E] - ): IO[E, Unit] = { + ): IO[E, Boolean] = { for { maybeCurrentState <- nonSecretStorage .getManagedDIDState(did) @@ -292,13 +292,13 @@ final class ManagedDIDServiceImpl private[walletapi] ( maybeNewPubState <- ZIO .foreach(maybeCurrentState)(i => computeNewDIDStateFromDLT(i.publicationState)) .mapError[E](e => e) - _ <- ZIO.foreach(maybeCurrentState zip maybeNewPubState) { case (currentState, newPubState) => + updated <- ZIO.foreach(maybeCurrentState zip maybeNewPubState) { case (currentState, newPubState) => nonSecretStorage .updateManagedDID(did, ManagedDIDStatePatch(newPubState)) .mapError[E](CommonWalletStorageError.apply) .when(currentState.publicationState != newPubState) } - } yield () + } yield updated.flatten.isDefined } /** Reconcile state with DLT and return an updated state */ diff --git a/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/service/ManagedDIDServiceWithEventNotificationImpl.scala b/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/service/ManagedDIDServiceWithEventNotificationImpl.scala new file mode 100644 index 0000000000..61409adace --- /dev/null +++ b/prism-agent/service/wallet-api/src/main/scala/io/iohk/atala/agent/walletapi/service/ManagedDIDServiceWithEventNotificationImpl.scala @@ -0,0 +1,84 @@ +package io.iohk.atala.agent.walletapi.service + +import io.iohk.atala.agent.walletapi.crypto.Apollo +import io.iohk.atala.agent.walletapi.model.{ManagedDIDState, ManagedDIDDetail} +import io.iohk.atala.agent.walletapi.model.error.CommonWalletStorageError +import io.iohk.atala.agent.walletapi.storage.{DIDNonSecretStorage, DIDSecretStorage} +import io.iohk.atala.agent.walletapi.util.SeedResolver +import io.iohk.atala.castor.core.model.did.CanonicalPrismDID +import io.iohk.atala.castor.core.model.error +import io.iohk.atala.castor.core.model.error.DIDOperationError +import io.iohk.atala.castor.core.service.DIDService +import io.iohk.atala.castor.core.util.DIDOperationValidator +import io.iohk.atala.event.notification.{Event, EventNotificationService} +import zio.{IO, RLayer, Semaphore, ZIO, ZLayer} + +class ManagedDIDServiceWithEventNotificationImpl( + didService: DIDService, + didOpValidator: DIDOperationValidator, + override private[walletapi] val secretStorage: DIDSecretStorage, + override private[walletapi] val nonSecretStorage: DIDNonSecretStorage, + apollo: Apollo, + seed: Array[Byte], + createDIDSem: Semaphore, + eventNotificationService: EventNotificationService +) extends ManagedDIDServiceImpl( + didService, + didOpValidator, + secretStorage, + nonSecretStorage, + apollo, + seed, + createDIDSem + ) { + + private val didStatusUpdatedEventName = "DIDStatusUpdated" + + override protected def computeNewDIDStateFromDLTAndPersist[E]( + did: CanonicalPrismDID + )(using + c1: Conversion[CommonWalletStorageError, E], + c2: Conversion[DIDOperationError, E] + ): IO[E, Boolean] = { + for { + updated <- super.computeNewDIDStateFromDLTAndPersist(did) + _ <- ZIO.when(updated) { + val result = for { + maybeUpdatedDID <- nonSecretStorage.getManagedDIDState(did) + updatedDID <- ZIO.fromOption(maybeUpdatedDID) + producer <- eventNotificationService.producer[ManagedDIDDetail]("DIDDetail") + _ <- producer.send(Event(didStatusUpdatedEventName, ManagedDIDDetail(did, updatedDID))) + } yield () + result.catchAll(e => ZIO.logError(s"Notification service error: $e")) + } + } yield updated + } +} + +object ManagedDIDServiceWithEventNotificationImpl { + val layer: RLayer[ + DIDOperationValidator & DIDService & DIDSecretStorage & DIDNonSecretStorage & Apollo & SeedResolver & + EventNotificationService, + ManagedDIDService + ] = ZLayer.fromZIO { + for { + didService <- ZIO.service[DIDService] + didOpValidator <- ZIO.service[DIDOperationValidator] + secretStorage <- ZIO.service[DIDSecretStorage] + nonSecretStorage <- ZIO.service[DIDNonSecretStorage] + apollo <- ZIO.service[Apollo] + seed <- ZIO.serviceWithZIO[SeedResolver](_.resolve) + createDIDSem <- Semaphore.make(1) + eventNotificationService <- ZIO.service[EventNotificationService] + } yield ManagedDIDServiceWithEventNotificationImpl( + didService, + didOpValidator, + secretStorage, + nonSecretStorage, + apollo, + seed, + createDIDSem, + eventNotificationService + ) + } +}