From 1a1702fc4e62b4a03a4e4ee32ac7419ea67a4ea1 Mon Sep 17 00:00:00 2001 From: Shailesh Patil <53746241+mineme0110@users.noreply.github.com> Date: Wed, 6 Sep 2023 09:46:45 +0100 Subject: [PATCH] fix(prism-agent): update invitation expiration on connection request (#687) Signed-off-by: Shailesh Patil --- .../model/error/ConnectionServiceError.scala | 1 + .../core/service/ConnectionService.scala | 6 ++- .../core/service/ConnectionServiceImpl.scala | 16 +++++++- .../service/ConnectionServiceNotifier.scala | 8 +++- .../core/service/MockConnectionService.scala | 6 ++- .../service/ConnectionServiceImplSpec.scala | 19 ++++++--- .../ConnectionServiceNotifierSpec.scala | 3 +- .../agent/server/DidCommHttpServer.scala | 18 ++++++--- .../server/jobs/ConnectBackgroundJobs.scala | 39 ++----------------- .../controller/ConnectionController.scala | 2 + 10 files changed, 65 insertions(+), 53 deletions(-) diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/model/error/ConnectionServiceError.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/model/error/ConnectionServiceError.scala index 88dff81e80..f70e53badd 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/model/error/ConnectionServiceError.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/model/error/ConnectionServiceError.scala @@ -12,5 +12,6 @@ object ConnectionServiceError { final case class UnexpectedError(msg: String) extends ConnectionServiceError final case class InvalidFlowStateError(msg: String) extends ConnectionServiceError final case class InvitationAlreadyReceived(msg: String) extends ConnectionServiceError + final case class InvitationExpired(msg: String) extends ConnectionServiceError } diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionService.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionService.scala index 818c37dc53..2713b551f4 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionService.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionService.scala @@ -24,7 +24,11 @@ trait ConnectionService { def markConnectionRequestSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] - def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] + // def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] + def receiveConnectionRequest( + request: ConnectionRequest, + expirationTime: Option[Duration] + ): IO[ConnectionServiceError, ConnectionRecord] def acceptConnectionRequest(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceImpl.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceImpl.scala index 88b0385c22..b3d8a47855 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceImpl.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceImpl.scala @@ -16,7 +16,7 @@ import zio.* import java.rmi.UnexpectedException import java.time.Instant import java.util.UUID - +import java.time.Duration private class ConnectionServiceImpl( connectionRepository: ConnectionRepository[Task], maxRetries: Int = 5, // TODO move to config @@ -176,13 +176,25 @@ private class ConnectionServiceImpl( } override def receiveConnectionRequest( - request: ConnectionRequest + request: ConnectionRequest, + expirationTime: Option[Duration] = None ): IO[ConnectionServiceError, ConnectionRecord] = for { record <- getRecordFromThreadIdAndState( Some(request.thid.orElse(request.pthid).getOrElse(request.id)), ProtocolState.InvitationGenerated ) + _ <- expirationTime.fold { + ZIO.unit + } { expiryDuration => + val actualDuration = Duration.between(record.createdAt, Instant.now()) + if (actualDuration > expiryDuration) { + for { + _ <- markConnectionInvitationExpired(record.id) + result <- ZIO.fail(InvitationExpired(record.id.toString)) + } yield result + } else ZIO.unit + } _ <- connectionRepository .updateWithConnectionRequest(record.id, request, ProtocolState.ConnectionRequestReceived, maxRetries) .flatMap { diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala index 129637ec7b..9f0a0f6c0e 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala @@ -7,6 +7,7 @@ import io.iohk.atala.mercury.model.DidId import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse} import zio.{IO, URLayer, ZIO, ZLayer} +import java.time.Duration import java.util.UUID class ConnectionServiceNotifier( @@ -34,8 +35,11 @@ class ConnectionServiceNotifier( override def markConnectionRequestSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = notifyOnSuccess(svc.markConnectionRequestSent(recordId)) - override def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] = - notifyOnSuccess(svc.receiveConnectionRequest(request)) + override def receiveConnectionRequest( + request: ConnectionRequest, + expirationTime: Option[Duration] + ): IO[ConnectionServiceError, ConnectionRecord] = + notifyOnSuccess(svc.receiveConnectionRequest(request, expirationTime)) override def acceptConnectionRequest(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = notifyOnSuccess(svc.acceptConnectionRequest(recordId)) diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala index 38f7dee1f0..2ba4ae7658 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala @@ -7,6 +7,7 @@ import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionR import zio.mock.{Mock, Proxy} import zio.{IO, URLayer, ZIO, ZLayer, mock} +import java.time.Duration import java.util.UUID object MockConnectionService extends Mock[ConnectionService] { @@ -44,7 +45,10 @@ object MockConnectionService extends Mock[ConnectionService] { override def markConnectionRequestSent(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = proxy(MarkConnectionRequestSent, recordId) - override def receiveConnectionRequest(request: ConnectionRequest): IO[ConnectionServiceError, ConnectionRecord] = + override def receiveConnectionRequest( + request: ConnectionRequest, + expirationTime: Option[Duration] + ): IO[ConnectionServiceError, ConnectionRecord] = proxy(ReceiveConnectionRequest, request) override def acceptConnectionRequest(recordId: UUID): IO[ConnectionServiceError, ConnectionRecord] = diff --git a/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceImplSpec.scala b/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceImplSpec.scala index 16b3719445..5a6b8bc75a 100644 --- a/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceImplSpec.scala +++ b/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceImplSpec.scala @@ -167,7 +167,7 @@ object ConnectionServiceImplSpec extends ZIOSpecDefault { ) // FIXME: Should the service return an Option while we have dedicated "not found" error for that case !? connectionRequest = maybeAcceptedInvitationRecord.connectionRequest.get - maybeReceivedRequestConnectionRecord <- inviterSvc.receiveConnectionRequest(connectionRequest) + maybeReceivedRequestConnectionRecord <- inviterSvc.receiveConnectionRequest(connectionRequest, None) allInviterRecords <- inviterSvc.getConnectionRecords() } yield { val updatedRecord = maybeReceivedRequestConnectionRecord @@ -194,8 +194,8 @@ object ConnectionServiceImplSpec extends ZIOSpecDefault { ) connectionRequest = maybeAcceptedInvitationRecord.connectionRequest.get connectionRecordUpdated <- inviterSvc.markConnectionInvitationExpired(inviterRecord.id) - - exit <- inviterSvc.receiveConnectionRequest(connectionRequest).exit + expiryTime = Duration.fromSeconds(300) + exit <- inviterSvc.receiveConnectionRequest(connectionRequest, Some(expiryTime)).exit } yield { assertTrue(exit match @@ -220,7 +220,11 @@ object ConnectionServiceImplSpec extends ZIOSpecDefault { DidId("did:peer:INVITEE") ) connectionRequest = maybeAcceptedInvitationRecord.connectionRequest.get - maybeReceivedRequestConnectionRecord <- inviterSvc.receiveConnectionRequest(connectionRequest) + expiryTime = Duration.fromSeconds(300) + maybeReceivedRequestConnectionRecord <- inviterSvc.receiveConnectionRequest( + connectionRequest, + Some(expiryTime) + ) maybeAcceptedRequestConnectionRecord <- inviterSvc.acceptConnectionRequest(inviterRecord.id) allInviterRecords <- inviterSvc.getConnectionRecords() } yield { @@ -250,7 +254,12 @@ object ConnectionServiceImplSpec extends ZIOSpecDefault { ) connectionRequest = maybeAcceptedInvitationRecord.connectionRequest.get _ <- inviteeSvc.markConnectionRequestSent(inviteeRecord.id) - maybeReceivedRequestConnectionRecord <- inviterSvc.receiveConnectionRequest(connectionRequest) + expiryTime = Duration.fromSeconds(300) + + maybeReceivedRequestConnectionRecord <- inviterSvc.receiveConnectionRequest( + connectionRequest, + Some(expiryTime) + ) maybeAcceptedRequestConnectionRecord <- inviterSvc.acceptConnectionRequest(inviterRecord.id) connectionResponseMessage <- ZIO.fromEither( maybeAcceptedRequestConnectionRecord.connectionResponse.get.makeMessage.asJson.as[Message] diff --git a/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifierSpec.scala b/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifierSpec.scala index 955e23969e..e8b0dbabd0 100644 --- a/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifierSpec.scala +++ b/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifierSpec.scala @@ -78,7 +78,8 @@ object ConnectionServiceNotifierSpec extends ZIOSpecDefault { thid = Some(connectionRecord.thid), pthid = None, body = ConnectionRequest.Body() - ) + ), + None ) _ <- cs.acceptConnectionRequest(connectionRecord.id) _ <- cs.markConnectionResponseSent(connectionRecord.id) diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/DidCommHttpServer.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/DidCommHttpServer.scala index 5102664466..cbce50db9c 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/DidCommHttpServer.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/DidCommHttpServer.scala @@ -3,6 +3,7 @@ package io.iohk.atala.agent.server import io.circe.* import io.circe.parser.* import io.iohk.atala.agent.server.DidCommHttpServerError.{DIDCommMessageParsingError, RequestBodyParsingError} +import io.iohk.atala.agent.server.config.AppConfig import io.iohk.atala.agent.walletapi.model.error.DIDSecretStorageError import io.iohk.atala.agent.walletapi.service.ManagedDIDService import io.iohk.atala.connect.core.model.error.ConnectionServiceError @@ -42,7 +43,7 @@ object DidCommHttpServer { private def didCommServiceEndpoint: HttpApp[ DidOps & DidAgent & CredentialService & PresentationService & ConnectionService & ManagedDIDService & HttpClient & - DidAgent & DIDResolver, + DidAgent & DIDResolver & AppConfig, Nothing ] = Http.collectZIO[Request] { case Method.GET -> !! / "did" => @@ -147,8 +148,11 @@ object DidCommHttpServer { /* * Connect */ - private val handleConnect - : PartialFunction[Message, ZIO[ConnectionService, DIDCommMessageParsingError | ConnectionServiceError, Unit]] = { + private val handleConnect: PartialFunction[Message, ZIO[ + ConnectionService & AppConfig, + DIDCommMessageParsingError | ConnectionServiceError, + Unit + ]] = { case msg if msg.piuri == ConnectionRequest.`type` => for { connectionRequest <- ZIO @@ -156,8 +160,12 @@ object DidCommHttpServer { .mapError(DIDCommMessageParsingError.apply) _ <- ZIO.logInfo("As an Inviter in connect got ConnectionRequest: " + connectionRequest) connectionService <- ZIO.service[ConnectionService] - maybeRecord <- connectionService.receiveConnectionRequest(connectionRequest) - _ <- connectionService.acceptConnectionRequest(maybeRecord.id) + config <- ZIO.service[AppConfig] + record <- connectionService.receiveConnectionRequest( + connectionRequest, + Some(config.connect.connectInvitationExpiry) + ) + _ <- connectionService.acceptConnectionRequest(record.id) } yield () case msg if msg.piuri == ConnectionResponse.`type` => for { diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala index dc6b9ff30a..302a02dda1 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala @@ -17,8 +17,6 @@ import io.iohk.atala.shared.utils.aspects.CustomMetricsAspect import io.iohk.atala.shared.utils.DurationOps.toMetricsSeconds import zio.* import zio.metrics.* -import java.time.{Instant, Duration} - object ConnectBackgroundJobs { val didCommExchanges = { @@ -29,7 +27,6 @@ object ConnectBackgroundJobs { .getConnectionRecordsByStates( ignoreWithZeroRetries = true, limit = config.connect.connectBgJobRecordsLimit, - ConnectionRecord.ProtocolState.InvitationGenerated, ConnectionRecord.ProtocolState.ConnectionRequestPending, ConnectionRecord.ProtocolState.ConnectionResponsePending ) @@ -40,7 +37,7 @@ object ConnectBackgroundJobs { private[this] def performExchange( record: ConnectionRecord - ): URIO[DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService & AppConfig, Unit] = { + ): URIO[DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService, Unit] = { import ProtocolState.* import Role.* @@ -60,9 +57,6 @@ object ConnectBackgroundJobs { val InviterConnectionResponseMsgSuccess = counterMetric( "connection_flow_inviter_connection_response_msg_success_counter" ) - val InviterConnectionInvitationExpiredSuccess = counterMetric( - "connection_flow_inviter_connection_invitation_expired_success_counter" - ) val InviteeProcessConnectionRecordPendingSuccess = counterMetric( "connection_flow_invitee_process_connection_record_success_counter" ) @@ -160,6 +154,7 @@ object ConnectBackgroundJobs { else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ InviterConnectionResponseMsgFailed } } yield () + inviterProcessFlow @@ InviterProcessConnectionRecordPendingSuccess.trackSuccess @@ InviterProcessConnectionRecordPendingFailed.trackError @@ -167,35 +162,7 @@ object ConnectBackgroundJobs { @@ Metric .gauge("connection_flow_inviter_process_connection_record_ms_gauge") .trackDurationWith(_.toMetricsSeconds) - case ConnectionRecord( - id, - createdAt, - _, - _, - _, - Inviter, - InvitationGenerated, - _, - _, - _, - metaRetries, - _, - _ - ) if metaRetries > 0 => - for { - connectionService <- ZIO.service[ConnectionService] - config <- ZIO.service[AppConfig] - expired <- ZIO.succeed { - val expiryDuration = config.connect.connectInvitationExpiry - val actualDuration = Duration.between(createdAt, Instant.now()) - actualDuration > expiryDuration - } - _ <- - if (expired) { - connectionService.markConnectionInvitationExpired(id) - @@ InviterConnectionInvitationExpiredSuccess - } else ZIO.unit - } yield () + case _ => ZIO.unit } diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/connect/controller/ConnectionController.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/connect/controller/ConnectionController.scala index 2bb9e0b81a..e484f1fe5a 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/connect/controller/ConnectionController.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/connect/controller/ConnectionController.scala @@ -49,4 +49,6 @@ object ConnectionController { ErrorResponse.badRequest(title = "InvalidFlowState", detail = Some(msg)) case ConnectionServiceError.InvitationAlreadyReceived(msg) => ErrorResponse.badRequest(title = "InvitationAlreadyReceived", detail = Some(msg)) + case ConnectionServiceError.InvitationExpired(msg) => + ErrorResponse.badRequest(title = "InvitationExpired", detail = Some(msg)) }