Skip to content

Commit

Permalink
feat(prism-agent): fix infinite reprocessing of records in error (#528)
Browse files Browse the repository at this point in the history
* chore(prism-agent): make records processing method names more explicit

* feat(connect): add 'meta_next_retry' column to connect repo and align 'updateAfterFail()' methods of connect/issue/verify

* chore(prism-agent): split PresentationService trait and impl

* chore(prism-agent): align 'reportProcessingFailure()' method of connect/credential/presentation services

* feat(prism-agent): report connect record processing failure on all errors

* feat(prism-agent): improve potential error handling when reporting record processing failure

* chore(prism-agent): clean-up unused on-chain VC publication code

* fix(prism-agent): fix scala.MatchError in bg jobs 'catchAll' block

* feat(prism-agent): do not laod records with max_retries=0 in connect bg job

* chore(prism-agent): set bg job recurrence delay to 5 sec (was 2) and processing parallelism to 4 (was 25)

* feat(prism-agent): add configurable parameter limiting the number of records processed in each background job iteration

* chore(prism-agent): set bg job recurrence delay to 2 sec given it only starts when previous execution ended

* chore(prism-agent): add newline at the end of file
  • Loading branch information
bvoiturier authored May 17, 2023
1 parent cb01657 commit 904a2dc
Show file tree
Hide file tree
Showing 27 changed files with 939 additions and 885 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import io.iohk.atala.connect.core.model.ConnectionRecord.{ProtocolState, Role}
import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse}
import io.iohk.atala.mercury.protocol.invitation.v2.Invitation

import java.util.UUID
import java.time.Instant
import java.util.UUID

/** @param id
* @param createdAt
Expand Down Expand Up @@ -36,6 +36,7 @@ case class ConnectionRecord(
connectionRequest: Option[ConnectionRequest],
connectionResponse: Option[ConnectionResponse],
metaRetries: Int,
metaNextRetry: Option[Instant],
metaLastFailure: Option[String]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ import io.iohk.atala.connect.core.model.ConnectionRecord
trait ConnectionRepository[F[_]] {
def createConnectionRecord(record: ConnectionRecord): F[Int]

def getConnectionRecords(): F[Seq[ConnectionRecord]]
def getConnectionRecords: F[Seq[ConnectionRecord]]

def getConnectionRecordsByStates(states: ConnectionRecord.ProtocolState*): F[Seq[ConnectionRecord]]
def getConnectionRecordsByStates(
ignoreWithZeroRetries: Boolean,
limit: Int,
states: ConnectionRecord.ProtocolState*
): F[Seq[ConnectionRecord]]

def getConnectionRecord(recordId: UUID): F[Option[ConnectionRecord]]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package io.iohk.atala.connect.core.repository

import io.iohk.atala.connect.core.model.ConnectionRecord
import io.iohk.atala.connect.core.model.error.ConnectionRepositoryError._
import io.iohk.atala.connect.core.model.ConnectionRecord.ProtocolState
import io.iohk.atala.mercury.protocol.connection.ConnectionRequest
import io.iohk.atala.mercury.protocol.connection.ConnectionResponse
import zio._
import io.iohk.atala.connect.core.model.error.ConnectionRepositoryError.*
import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse}
import zio.*

import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -141,16 +140,23 @@ class ConnectionRepositoryInMemory(storeRef: Ref[Map[UUID, ConnectionRecord]]) e
} yield store.values.find(_.thid.contains(thid))
}

override def getConnectionRecords(): Task[Seq[ConnectionRecord]] = {
override def getConnectionRecords: Task[Seq[ConnectionRecord]] = {
for {
store <- storeRef.get
} yield store.values.toSeq
}

override def getConnectionRecordsByStates(states: ConnectionRecord.ProtocolState*): Task[Seq[ConnectionRecord]] = {
override def getConnectionRecordsByStates(
ignoreWithZeroRetries: Boolean,
limit: Int,
states: ConnectionRecord.ProtocolState*
): Task[Seq[ConnectionRecord]] = {
for {
store <- storeRef.get
} yield store.values.filter(rec => states.contains(rec.protocolState)).toSeq
} yield store.values
.filter(rec => (ignoreWithZeroRetries & rec.metaRetries > 0) & states.contains(rec.protocolState))
.take(limit)
.toSeq
}

override def createConnectionRecord(record: ConnectionRecord): Task[Int] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package io.iohk.atala.connect.core.service

import io.iohk.atala.connect.core.model.ConnectionRecord
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import zio._
import java.util.UUID
import io.iohk.atala.connect.core.model.error.ConnectionServiceError.RepositoryError
import io.iohk.atala.mercury.model.DidId
import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse}
import io.iohk.atala.mercury.protocol.invitation.v2.Invitation
import io.iohk.atala.mercury.protocol.connection.ConnectionRequest
import io.iohk.atala.mercury.protocol.connection.ConnectionResponse
import zio.*

import java.util.UUID

trait ConnectionService {

Expand Down Expand Up @@ -36,6 +37,8 @@ trait ConnectionService {
def getConnectionRecords(): IO[ConnectionServiceError, Seq[ConnectionRecord]]

def getConnectionRecordsByStates(
ignoreWithZeroRetries: Boolean,
limit: Int,
states: ConnectionRecord.ProtocolState*
): IO[ConnectionServiceError, Seq[ConnectionRecord]]

Expand All @@ -44,6 +47,6 @@ trait ConnectionService {

def deleteConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Int]

def reportProcessingFailure(recordId: UUID, failReason: Option[String]): IO[ConnectionServiceError, Int]
def reportProcessingFailure(recordId: UUID, failReason: Option[String]): IO[ConnectionServiceError, Unit]

}
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package io.iohk.atala.connect.core.service

import zio._
import io.iohk.atala.connect.core.repository.ConnectionRepository
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.connect.core.model.error.ConnectionServiceError._
import io.iohk.atala.connect.core.model.ConnectionRecord
import io.iohk.atala.connect.core.model.ConnectionRecord._
import java.util.UUID
import io.iohk.atala.mercury._
import io.iohk.atala.connect.core.model.ConnectionRecord.*
import io.iohk.atala.connect.core.model.error.ConnectionServiceError
import io.iohk.atala.connect.core.model.error.ConnectionServiceError.*
import io.iohk.atala.connect.core.repository.ConnectionRepository
import io.iohk.atala.mercury.*
import io.iohk.atala.mercury.model.DidId
import io.iohk.atala.mercury.protocol.connection.*
import io.iohk.atala.mercury.protocol.invitation.v2.Invitation
import io.iohk.atala.mercury.protocol.connection._
import java.time.Instant
import java.rmi.UnexpectedException
import io.iohk.atala.shared.utils.Base64Utils
import zio.*

import java.rmi.UnexpectedException
import java.time.Instant
import java.util.UUID

private class ConnectionServiceImpl(
connectionRepository: ConnectionRepository[Task],
Expand All @@ -39,6 +40,7 @@ private class ConnectionServiceImpl(
connectionRequest = None,
connectionResponse = None,
metaRetries = maxRetries,
metaNextRetry = Some(Instant.now()),
metaLastFailure = None,
)
)
Expand All @@ -53,18 +55,19 @@ private class ConnectionServiceImpl(

override def getConnectionRecords(): IO[ConnectionServiceError, Seq[ConnectionRecord]] = {
for {
records <- connectionRepository
.getConnectionRecords()
records <- connectionRepository.getConnectionRecords
.mapError(RepositoryError.apply)
} yield records
}

override def getConnectionRecordsByStates(
ignoreWithZeroRetries: Boolean,
limit: Int,
states: ProtocolState*
): IO[ConnectionServiceError, Seq[ConnectionRecord]] = {
for {
records <- connectionRepository
.getConnectionRecordsByStates(states: _*)
.getConnectionRecordsByStates(ignoreWithZeroRetries, limit, states: _*)
.mapError(RepositoryError.apply)
} yield records
}
Expand Down Expand Up @@ -106,6 +109,7 @@ private class ConnectionServiceImpl(
connectionRequest = None,
connectionResponse = None,
metaRetries = maxRetries,
metaNextRetry = Some(Instant.now()),
metaLastFailure = None,
)
)
Expand Down Expand Up @@ -295,10 +299,14 @@ private class ConnectionServiceImpl(
} yield record
}

def reportProcessingFailure(recordId: UUID, failReason: Option[String]): IO[ConnectionServiceError, Int] =
def reportProcessingFailure(recordId: UUID, failReason: Option[String]): IO[ConnectionServiceError, Unit] =
connectionRepository
.updateAfterFail(recordId, failReason)
.mapError(RepositoryError.apply)
.flatMap {
case 1 => ZIO.unit
case n => ZIO.fail(UnexpectedError(s"Invalid number of records updated: $n"))
}

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package io.iohk.atala.connect.core.repository

import io.iohk.atala.connect.core.model.ConnectionRecord
import io.iohk.atala.connect.core.model.ConnectionRecord._
import io.iohk.atala.connect.core.model.error.ConnectionRepositoryError._
import io.iohk.atala.connect.core.model.ConnectionRecord.*
import io.iohk.atala.connect.core.model.error.ConnectionRepositoryError.*
import io.iohk.atala.mercury.model.DidId
import io.iohk.atala.mercury.protocol.connection.ConnectionRequest
import io.iohk.atala.mercury.protocol.connection.ConnectionResponse
import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse}
import io.iohk.atala.mercury.protocol.invitation.v2.Invitation
import zio.Cause
import zio.Exit
import zio.Task
import zio.ZIO
import zio.test.Assertion._
import zio.test._
import zio.test.*
import zio.test.Assertion.*
import zio.{Cause, Exit, Task, ZIO}

import java.time.Instant
import java.util.UUID
Expand All @@ -23,7 +19,7 @@ object ConnectionRepositorySpecSuite {

private def connectionRecord = ConnectionRecord(
UUID.randomUUID,
Instant.ofEpochSecond(Instant.now.getEpochSecond()),
Instant.ofEpochSecond(Instant.now.getEpochSecond),
None,
None,
None,
Expand All @@ -38,6 +34,7 @@ object ConnectionRepositorySpecSuite {
None,
None,
maxRetries,
Some(Instant.now),
None
)

Expand Down Expand Up @@ -99,7 +96,7 @@ object ConnectionRepositorySpecSuite {
bRecord = connectionRecord
_ <- repo.createConnectionRecord(aRecord)
_ <- repo.createConnectionRecord(bRecord)
records <- repo.getConnectionRecords()
records <- repo.getConnectionRecords
} yield {
assertTrue(records.size == 2) &&
assertTrue(records.contains(aRecord)) &&
Expand Down Expand Up @@ -127,8 +124,14 @@ object ConnectionRepositorySpecSuite {
ProtocolState.ConnectionResponsePending,
1
)
invitationGeneratedRecords <- repo.getConnectionRecordsByStates(ProtocolState.InvitationGenerated)
invitationGeneratedRecords <- repo.getConnectionRecordsByStates(
ignoreWithZeroRetries = true,
limit = 10,
ProtocolState.InvitationGenerated
)
otherRecords <- repo.getConnectionRecordsByStates(
ignoreWithZeroRetries = true,
limit = 10,
ProtocolState.ConnectionRequestReceived,
ProtocolState.ConnectionResponsePending
)
Expand All @@ -149,11 +152,29 @@ object ConnectionRepositorySpecSuite {
_ <- repo.createConnectionRecord(aRecord)
_ <- repo.createConnectionRecord(bRecord)
_ <- repo.createConnectionRecord(cRecord)
records <- repo.getConnectionRecordsByStates()
records <- repo.getConnectionRecordsByStates(ignoreWithZeroRetries = true, limit = 10)
} yield {
assertTrue(records.isEmpty)
}
},
test("getConnectionRecordsByStates returns an a subset of records when limit is specified") {
for {
repo <- ZIO.service[ConnectionRepository[Task]]
aRecord = connectionRecord
bRecord = connectionRecord
cRecord = connectionRecord
_ <- repo.createConnectionRecord(aRecord)
_ <- repo.createConnectionRecord(bRecord)
_ <- repo.createConnectionRecord(cRecord)
records <- repo.getConnectionRecordsByStates(
ignoreWithZeroRetries = true,
limit = 2,
ProtocolState.InvitationGenerated
)
} yield {
assertTrue(records.size == 2)
}
},
test("deleteRecord deletes an existing record") {
for {
repo <- ZIO.service[ConnectionRepository[Task]]
Expand All @@ -162,7 +183,7 @@ object ConnectionRepositorySpecSuite {
_ <- repo.createConnectionRecord(aRecord)
_ <- repo.createConnectionRecord(bRecord)
count <- repo.deleteConnectionRecord(aRecord.id)
records <- repo.getConnectionRecords()
records <- repo.getConnectionRecords
} yield {
assertTrue(count == 1) &&
assertTrue(records.size == 1) &&
Expand All @@ -177,7 +198,7 @@ object ConnectionRepositorySpecSuite {
_ <- repo.createConnectionRecord(aRecord)
_ <- repo.createConnectionRecord(bRecord)
count <- repo.deleteConnectionRecord(UUID.randomUUID)
records <- repo.getConnectionRecords()
records <- repo.getConnectionRecords
} yield {
assertTrue(count == 0) &&
assertTrue(records.size == 2) &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE public.connection_records
ADD meta_next_retry TIMESTAMP WITH TIME ZONE;
Loading

0 comments on commit 904a2dc

Please sign in to comment.