Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prism-agent): simple event mechanism using webhook #575

Merged
merged 67 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
997aff5
feat(prism-agent): introduce webhook publisher config
bvoiturier Jun 16, 2023
c66eaf2
feat(prism-agent): add necessary event notification interfaces
bvoiturier Jun 16, 2023
d2f86b2
feat(prism-agent): add event notification service 'in-memory' impleme…
bvoiturier Jun 16, 2023
2fa6d45
feat(prism-agent): implement dummy webhook publisher consumer
bvoiturier Jun 16, 2023
3d4f351
feat(prism-agent): add notification service calls for issue credentia…
bvoiturier Jun 16, 2023
c39cf8d
feat(prism-agent): wire notification service and webhook publisher in…
bvoiturier Jun 16, 2023
c7d3b65
feat(prism-agent): add fake webhook URL config param
bvoiturier Jun 16, 2023
2ad8a7a
feat(prism-agent): notify when credential is generated
bvoiturier Jun 20, 2023
21c7ba1
feat(prism-agent): notify issue credential steps occuring on the DIDC…
bvoiturier Jun 20, 2023
4153b87
feat(prism-agent): extract event notification classes to a new shared…
bvoiturier Jun 20, 2023
04f53fe
feat(pollux): add credential service subclass that notifies of events
bvoiturier Jun 20, 2023
62be00f
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jun 21, 2023
544a81f
feat(pollux): use global ZIO http client layer in HttpURIDereferencer…
bvoiturier Jun 22, 2023
905e629
feat(prism-agent): implement real HTTP calls in Webhook publisher
bvoiturier Jun 22, 2023
88a7bd4
feat(prism-agent): include WEBHOOK_API_KEY in request Authorization h…
bvoiturier Jun 22, 2023
1c49fcb
chore(prism-agent): declare WALLET_SEED and WEBHOOK_API_KEY in docker…
bvoiturier Jun 22, 2023
7411f87
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jun 23, 2023
d51b540
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jun 26, 2023
081dff1
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jun 26, 2023
de36607
chore(prism-agent): complete main branch merging
bvoiturier Jun 26, 2023
ca6de76
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jun 27, 2023
d542e0d
feature(prism-agent): refactor event notification and make it more ge…
bvoiturier Jun 27, 2023
8100b74
feature(prism-agent): add connect & presentation consumers in Webhook…
bvoiturier Jun 28, 2023
4d3fef5
chore(prism-agent): use postgres secret storage implementation in def…
bvoiturier Jun 28, 2023
f869d6f
feat(prism-agent): add event notification sending for connect flow
bvoiturier Jun 29, 2023
1bdb9ac
chore(pollux): use RecordIdNotFound in error channel instead of an Op…
bvoiturier Jun 29, 2023
d9c9beb
feat(prism-agent): implement event notifiation sending in presentatio…
bvoiturier Jun 29, 2023
4ec0384
test(pollux): fix unit test related to presentation rejection
bvoiturier Jun 29, 2023
6f49a61
chore(prism-agent): get rid of event encoder/decoder (useless for now)
bvoiturier Jun 29, 2023
a871973
chore(pollux): rename event notification service implementation
bvoiturier Jun 30, 2023
4b03187
test(prism-agent): make capacity of event queue configurable and add …
bvoiturier Jun 30, 2023
67954db
chore(prism-agent): add event notification service dependency to wall…
bvoiturier Jun 30, 2023
7c28764
feat(prism-agent): add new 'DIDState' topic and notify of DID publish…
bvoiturier Jun 30, 2023
4365519
feat(prism-agent): consume new 'DIDState' topic events (only DID publ…
bvoiturier Jun 30, 2023
c08c9ab
chore(prism-agent): run scalafmt
bvoiturier Jun 30, 2023
a3e723d
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jul 3, 2023
ae8372a
chore(prism-agent): scalafmtAll
bvoiturier Jul 4, 2023
a6e2e14
test(connect): add unit tests for connection service with event notif
bvoiturier Jul 4, 2023
87d7d03
test(pollux): add unit tests for credential service with event notif
bvoiturier Jul 4, 2023
a66217c
test(pollux): move presentation service spec utility methods to helpe…
bvoiturier Jul 4, 2023
8b2ea65
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jul 4, 2023
15ce6a5
chore(prism-agent): scalafmtAll
bvoiturier Jul 4, 2023
6c32311
chore(pollux): fix method typo
bvoiturier Jul 5, 2023
4901251
test(pollux): add unit tests for presentation flow notifications, int…
bvoiturier Jul 5, 2023
ffd21cb
test(connect): refactor notifier unit tests to use ZIO mock for Conne…
bvoiturier Jul 5, 2023
4b434cd
test(pollux): refactor credential service notifier unit tests to use …
bvoiturier Jul 5, 2023
d1108a0
fix(pollux): fix presentation accepted/rejected event not being sent
bvoiturier Jul 5, 2023
eb14280
chore(prism-agent): scalafmtAll
bvoiturier Jul 5, 2023
03b2a47
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jul 6, 2023
2d36015
test(pollux): fix unit tests
bvoiturier Jul 6, 2023
6a3839d
feat: add Json serialization for events #1
yshyn-iohk Jul 6, 2023
c954544
feat: add application/json header
yshyn-iohk Jul 6, 2023
c27e7bc
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jul 6, 2023
678f8cc
doc(prism-agent): update protocol state enums in OAS
bvoiturier Jul 7, 2023
b71a0f0
doc(prism-agent): add tutorial for simple event mechanism webhooks
bvoiturier Jul 7, 2023
e47aaf4
doc(prism-agent): update webhook tutorial
bvoiturier Jul 7, 2023
f97431f
doc(prism-agent): update Docusaurus sidebar for webhook
bvoiturier Jul 7, 2023
0ea4124
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jul 7, 2023
2181bfa
feat: add event type field to all events
yshyn-iohk Jul 10, 2023
8aff7cc
chore(prism-agent): use ZIO ConcurrentMap to store notification queues
bvoiturier Jul 10, 2023
ecfbb7c
Merge branch 'main' into feature/ATL-4093-simple-event-mechanism
bvoiturier Jul 10, 2023
45348b2
chore(prism-agent): fixing last PR comments
bvoiturier Jul 10, 2023
93cf3bd
doc(prism-agent): document DID-related events in webhook.md
bvoiturier Jul 10, 2023
42bfd82
fix(prism-agent): use ZIO sliding queue (discarding old messages) ins…
bvoiturier Jul 10, 2023
94c273d
feat(prism-agent): use a 5 seconds request timeout in Webhook publisher
bvoiturier Jul 10, 2023
1d8b2f2
test(prism-agent): fix unit test for sliding queue
bvoiturier Jul 10, 2023
df46486
chore(prism-agent): fix URLs in webhook.md
bvoiturier Jul 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ lazy val D_Connect = new {

// Dependency Modules
private lazy val baseDependencies: Seq[ModuleID] =
Seq(D.zio, D.zioTest, D.zioTestSbt, D.zioTestMagnolia, D.testcontainersPostgres, logback)
Seq(D.zio, D.zioTest, D.zioTestSbt, D.zioTestMagnolia, D.zioMock, D.testcontainersPostgres, logback)

// Project Dependencies
lazy val coreDependencies: Seq[ModuleID] =
Expand Down Expand Up @@ -223,6 +223,7 @@ lazy val D_Pollux = new {
D.zioTest,
D.zioTestSbt,
D.zioTestMagnolia,
D.zioMock,
D.munit,
D.munitZio,
prismCrypto,
Expand Down Expand Up @@ -280,6 +281,17 @@ lazy val D_Pollux_VC_JWT = new {
lazy val polluxVcJwtDependencies: Seq[ModuleID] = baseDependencies
}

lazy val D_EventNotification = new {
val zio = "dev.zio" %% "zio" % V.zio
val zioConcurrent = "dev.zio" %% "zio-concurrent" % V.zio
val zioTest = "dev.zio" %% "zio-test" % V.zio % Test
val zioTestSbt = "dev.zio" %% "zio-test-sbt" % V.zio % Test
val zioTestMagnolia = "dev.zio" %% "zio-test-magnolia" % V.zio % Test

val zioDependencies: Seq[ModuleID] = Seq(zio, zioConcurrent, zioTest, zioTestSbt, zioTestMagnolia)
val baseDependencies: Seq[ModuleID] = zioDependencies
}

lazy val D_PrismAgent = new {

// Added here to make prism-crypto works.
Expand Down Expand Up @@ -632,7 +644,7 @@ lazy val polluxCore = project
)
.dependsOn(shared)
.dependsOn(polluxVcJWT)
.dependsOn(protocolIssueCredential, protocolPresentProof, resolver, agentDidcommx)
.dependsOn(protocolIssueCredential, protocolPresentProof, resolver, agentDidcommx, eventNotification)

lazy val polluxDoobie = project
.in(file("pollux/lib/sql-doobie"))
Expand Down Expand Up @@ -687,7 +699,7 @@ lazy val connectCore = project
Test / publishArtifact := true
)
.dependsOn(shared)
.dependsOn(protocolConnection, protocolReportProblem)
.dependsOn(protocolConnection, protocolReportProblem, eventNotification)

lazy val connectDoobie = project
.in(file("connect/lib/sql-doobie"))
Expand All @@ -699,6 +711,17 @@ lazy val connectDoobie = project
.dependsOn(shared)
.dependsOn(connectCore % "compile->compile;test->test")

// ############################
// #### Event Notification ####
// ############################

lazy val eventNotification = project
.in(file("event-notification"))
.settings(
name := "event-notification",
libraryDependencies ++= D_EventNotification.baseDependencies
)

// #####################
// #### Prism Agent ####
// #####################
Expand All @@ -711,8 +734,11 @@ lazy val prismAgentWalletAPI = project
name := "prism-agent-wallet-api",
libraryDependencies ++= D_PrismAgent.keyManagementDependencies
)
.dependsOn(agentDidcommx)
.dependsOn(castorCore)
.dependsOn(
agentDidcommx,
castorCore,
eventNotification
)

lazy val prismAgentServer = project
.in(file("prism-agent/service/server"))
Expand Down Expand Up @@ -741,7 +767,8 @@ lazy val prismAgentServer = project
polluxAnoncreds,
connectCore,
connectDoobie,
castorCore
castorCore,
eventNotification
)

// ##################
Expand Down Expand Up @@ -822,6 +849,7 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq(
prismAgentWalletAPI,
prismAgentServer,
mediator,
eventNotification,
)

lazy val root = project
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package io.iohk.atala.connect.core.service

import io.iohk.atala.connect.core.model.ConnectionRecord
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.event.notification.{Event, EventNotificationService}
import io.iohk.atala.mercury.model.DidId
import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse}
import zio.{IO, URLayer, ZIO, ZLayer}

import java.util.UUID

class ConnectionServiceNotifier(
svc: ConnectionService,
eventNotificationService: EventNotificationService
) extends ConnectionService {

private val connectionUpdatedEvent = "ConnectionUpdated"

override def createConnectionInvitation(
label: Option[String],
pairwiseDID: DidId
): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.createConnectionInvitation(label, pairwiseDID))

override def receiveConnectionInvitation(invitation: String): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.receiveConnectionInvitation(invitation))

override def acceptConnectionInvitation(
recordId: UUID,
pairwiseDid: DidId
): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.acceptConnectionInvitation(recordId, pairwiseDid))

override def markConnectionRequestSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.markConnectionRequestSent(recordId))

override def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.receiveConnectionRequest(request))

override def acceptConnectionRequest(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.acceptConnectionRequest(recordId))

override def markConnectionResponseSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.markConnectionResponseSent(recordId))

override def receiveConnectionResponse(response: ConnectionResponse): IO[ConnectionServiceError, ConnectionRecord] =
notifyOnSuccess(svc.receiveConnectionResponse(response))

private[this] def notifyOnSuccess(effect: IO[ConnectionServiceError, ConnectionRecord]) =
for {
record <- effect
_ <- notify(record)
} yield record

private[this] def notify(record: ConnectionRecord) = {
val result = for {
producer <- eventNotificationService.producer[ConnectionRecord]("Connect")
_ <- producer.send(Event(connectionUpdatedEvent, record))
} yield ()
result.catchAll(e => ZIO.logError(s"Notification service error: $e"))
}

override def getConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Option[ConnectionRecord]] =
svc.getConnectionRecord(recordId)

override def getConnectionRecordByThreadId(thid: String): IO[ConnectionServiceError, Option[ConnectionRecord]] =
svc.getConnectionRecordByThreadId(thid)

override def deleteConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Int] =
svc.deleteConnectionRecord(recordId)

override def reportProcessingFailure(recordId: UUID, failReason: Option[String]): IO[ConnectionServiceError, Unit] =
svc.reportProcessingFailure(recordId, failReason)

override def getConnectionRecords(): IO[ConnectionServiceError, Seq[ConnectionRecord]] =
svc.getConnectionRecords()

override def getConnectionRecordsByStates(
ignoreWithZeroRetries: Boolean,
limit: Int,
states: ConnectionRecord.ProtocolState*
): IO[ConnectionServiceError, Seq[ConnectionRecord]] =
svc.getConnectionRecordsByStates(ignoreWithZeroRetries, limit, states: _*)
}

object ConnectionServiceNotifier {
val layer: URLayer[ConnectionService & EventNotificationService, ConnectionService] =
ZLayer.fromFunction(ConnectionServiceNotifier(_, _))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.iohk.atala.connect.core.service

import io.iohk.atala.connect.core.model.ConnectionRecord
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.mercury.model.DidId
import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse}
import zio.mock.{Mock, Proxy}
import zio.{IO, URLayer, ZIO, ZLayer, mock}

import java.util.UUID

object MockConnectionService extends Mock[ConnectionService] {

object CreateConnectionInvitation extends Effect[(Option[String], DidId), ConnectionServiceError, ConnectionRecord]
object ReceiveConnectionInvitation extends Effect[String, ConnectionServiceError, ConnectionRecord]
object AcceptConnectionInvitation extends Effect[(UUID, DidId), ConnectionServiceError, ConnectionRecord]
object MarkConnectionRequestSent extends Effect[UUID, ConnectionServiceError, ConnectionRecord]
object ReceiveConnectionRequest extends Effect[ConnectionRequest, ConnectionServiceError, ConnectionRecord]
object AcceptConnectionRequest extends Effect[UUID, ConnectionServiceError, ConnectionRecord]
object MarkConnectionResponseSent extends Effect[UUID, ConnectionServiceError, ConnectionRecord]
object ReceiveConnectionResponse extends Effect[ConnectionResponse, ConnectionServiceError, ConnectionRecord]

override val compose: URLayer[mock.Proxy, ConnectionService] = ZLayer {
for {
proxy <- ZIO.service[Proxy]
} yield new ConnectionService {
override def createConnectionInvitation(
label: Option[String],
pairwiseDID: DidId
): IO[ConnectionServiceError, ConnectionRecord] =
proxy(CreateConnectionInvitation, label, pairwiseDID)

override def receiveConnectionInvitation(invitation: String): IO[ConnectionServiceError, ConnectionRecord] =
proxy(ReceiveConnectionInvitation, invitation)

override def acceptConnectionInvitation(
recordId: UUID,
pairwiseDid: DidId
): IO[ConnectionServiceError, ConnectionRecord] =
proxy(AcceptConnectionInvitation, recordId, pairwiseDid)

override def markConnectionRequestSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
proxy(MarkConnectionRequestSent, recordId)

override def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] =
proxy(ReceiveConnectionRequest, request)

override def acceptConnectionRequest(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
proxy(AcceptConnectionRequest, recordId)

override def markConnectionResponseSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] =
proxy(MarkConnectionResponseSent, recordId)

override def receiveConnectionResponse(
response: ConnectionResponse
): IO[ConnectionServiceError, ConnectionRecord] =
proxy(ReceiveConnectionResponse, response)

override def getConnectionRecords(): IO[ConnectionServiceError, Seq[ConnectionRecord]] = ???

override def getConnectionRecordsByStates(
ignoreWithZeroRetries: Boolean,
limit: Int,
states: ConnectionRecord.ProtocolState*
): IO[ConnectionServiceError, Seq[ConnectionRecord]] = ???

override def getConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Option[ConnectionRecord]] = ???

override def getConnectionRecordByThreadId(thid: String): IO[ConnectionServiceError, Option[ConnectionRecord]] =
???

override def deleteConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Int] = ???

override def reportProcessingFailure(
recordId: UUID,
failReason: Option[String]
): IO[ConnectionServiceError, Unit] = ???
}
}
}
Loading