Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pollux): Add retries field for ATL-3205 #380

Merged
merged 13 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pollux/lib/.scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version = 3.5.9
runner.dialect = scala3

maxColumn = 120
maxColumn = 120
trailingCommas = preserve
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ final case class IssueCredentialRecord(
requestCredentialData: Option[RequestCredential],
issueCredentialData: Option[IssueCredential],
issuedCredentialRaw: Option[String],
issuingDID: Option[CanonicalPrismDID]
issuingDID: Option[CanonicalPrismDID],
metaRetries: Int,
metaNextRetry: Option[Instant],
metaLastFailure: Option[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an error string?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FabioPinheiro Maybe we can replace the error string with a wrapper error like this one?

Copy link
Contributor Author

@FabioPinheiro FabioPinheiro Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point, let me try.

)
final case class ValidIssuedCredentialRecord(
id: UUID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,22 @@ package io.iohk.atala.pollux.core.model.error
sealed trait CredentialRepositoryError extends Throwable

object CredentialRepositoryError {
final case class UniqueConstraintViolation(message: String) extends CredentialRepositoryError

sealed trait SQLCredentialRepositoryError extends Throwable {
def errorMessage: String
}

/** @param code
* PSQLException's code
* @param errorMessage
* info abour the error code and error
*/
def fromPSQLException(code: String, errorMessage: String): SQLCredentialRepositoryError =
code.toIntOption.getOrElse(-1) match
case 23505 /* PSQLState.UNIQUE_VIOLATION */ => UniqueConstraintViolation(errorMessage = errorMessage)
shotexa marked this conversation as resolved.
Show resolved Hide resolved
case c => SQLErrorCredentialRepositoryError(c, errorMessage)

final case class UniqueConstraintViolation(errorMessage: String) extends SQLCredentialRepositoryError
final case class SQLErrorCredentialRepositoryError(code: Int, errorMessage: String)
extends SQLCredentialRepositoryError
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ trait CredentialRepository[F[_]] {

def getValidIssuedCredentials(recordId: Seq[UUID]): F[Seq[ValidIssuedCredentialRecord]]

def updateAfterFail(
recordId: UUID,
failReason: Option[String]
): Task[Int]

}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ class CredentialRepositoryInMemory(storeRef: Ref[Map[UUID, IssueCredentialRecord
idsStatesAndProofs: Seq[(UUID, PublicationState, MerkleInclusionProof)]
): Task[Int] = ???

def updateAfterFail(
recordId: UUID,
failReason: Option[String]
): Task[Int] = for {
maybeRecord <- getIssueCredentialRecord(recordId)
count <- maybeRecord
.map(record =>
for {
_ <- storeRef.update(r =>
r.updated(
recordId,
record.copy(
metaRetries = record.metaRetries - 1,
metaNextRetry =
if (record.metaRetries - 1 == 0) None
else Some(Instant.now().plusSeconds(60)), // TODO exponention time
metaLastFailure = failReason
)
)
)
} yield 1
)
.getOrElse(ZIO.succeed(0))
} yield count

}

object CredentialRepositoryInMemory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ object CredentialServiceImpl {

private class CredentialServiceImpl(
irisClient: IrisServiceStub,
credentialRepository: CredentialRepository[Task]
credentialRepository: CredentialRepository[Task],
maxRetries: Int = 5 // TODO move to config
shotexa marked this conversation as resolved.
Show resolved Hide resolved
) extends CredentialService {

import IssueCredentialRecord._
Expand Down Expand Up @@ -108,7 +109,10 @@ private class CredentialServiceImpl(
requestCredentialData = None,
issueCredentialData = None,
issuedCredentialRaw = None,
issuingDID = issuingDID
issuingDID = issuingDID,
metaRetries = maxRetries,
metaNextRetry = Some(Instant.now()),
metaLastFailure = None,
)
)
count <- credentialRepository
Expand Down Expand Up @@ -164,7 +168,10 @@ private class CredentialServiceImpl(
requestCredentialData = None,
issueCredentialData = None,
issuedCredentialRaw = None,
issuingDID = None
issuingDID = None,
metaRetries = maxRetries,
metaNextRetry = Some(Instant.now()),
metaLastFailure = None,
)
)
count <- credentialRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.UUID
import io.iohk.atala.castor.core.model.did.PrismDID

object CredentialRepositorySpecSuite {
val maxRetries = 2

private def issueCredentialRecord = IssueCredentialRecord(
id = UUID.randomUUID,
Expand All @@ -39,7 +40,10 @@ object CredentialRepositorySpecSuite {
requestCredentialData = None,
issueCredentialData = None,
issuedCredentialRaw = None,
issuingDID = None
issuingDID = None,
metaRetries = maxRetries,
metaNextRetry = Some(Instant.now()),
metaLastFailure = None,
)

private def requestCredential = RequestCredential(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

CREATE TABLE public.issue_credential_records(
"id" VARCHAR(36) NOT NULL PRIMARY KEY,
"created_at" BIGINT NOT NULL,
"updated_at" BIGINT,
"created_at" TIMESTAMP NOT NULL,
"updated_at" TIMESTAMP,
"thid" VARCHAR(36) NOT NULL,
"schema_id" VARCHAR(36),
"role" VARCHAR(50) NOT NULL,
Expand All @@ -29,13 +29,18 @@ CREATE TABLE public.issue_credential_records(
"offer_credential_data" TEXT,
"request_credential_data" TEXT,
"issue_credential_data" TEXT,
"issued_credential_raw" TEXT
"issued_credential_raw" TEXT,
"issuing_did" TEXT,
"meta_retries" BIGINT NOT NULL,
"meta_next_retry" TIMESTAMP,
"meta_last_failure" TEXT,
CONSTRAINT unique_thid UNIQUE (thid)
);

CREATE TABLE public.presentation_records(
"id" VARCHAR(36) NOT NULL PRIMARY KEY,
"created_at" BIGINT NOT NULL,
"updated_at" BIGINT,
"created_at" TIMESTAMP NOT NULL,
"updated_at" TIMESTAMP,
shotexa marked this conversation as resolved.
Show resolved Hide resolved
"thid" VARCHAR(36) NOT NULL,
"schema_id" VARCHAR(36),
"connection_id" VARCHAR(36),
Expand All @@ -44,5 +49,6 @@ CREATE TABLE public.presentation_records(
"protocol_state" VARCHAR(50) NOT NULL,
"request_presentation_data" TEXT,
"propose_presentation_data" TEXT,
"presentation_data" TEXT
"presentation_data" TEXT,
"credentials_to_use" TEXT[] NULL
);

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
ALTER TABLE public.issue_credential_records
ADD COLUMN "issuing_did" TEXT;
-- ALTER TABLE public.issue_credential_records
FabioPinheiro marked this conversation as resolved.
Show resolved Hide resolved
-- ADD COLUMN "issuing_did" TEXT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- ALTER TABLE public.issue_credential_records
-- ADD meta_retries BIGINT NOT NULL DEFAULT '3',
-- ADD meta_next_retry TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
-- ADD meta_last_failure TEXT;
FabioPinheiro marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import io.circe.syntax._
import io.iohk.atala.mercury.protocol.issuecredential.IssueCredential
import io.iohk.atala.mercury.protocol.issuecredential.OfferCredential
import io.iohk.atala.mercury.protocol.issuecredential.RequestCredential
import io.iohk.atala.castor.core.model.did._
import io.iohk.atala.pollux.core.model._
import io.iohk.atala.pollux.core.model.IssueCredentialRecord.ProtocolState
import io.iohk.atala.pollux.core.model.*
import io.iohk.atala.pollux.core.model.error.CredentialRepositoryError
import io.iohk.atala.pollux.core.model.error.CredentialRepositoryError._
import io.iohk.atala.pollux.core.repository.CredentialRepository
import io.iohk.atala.pollux.sql.model.JWTCredentialRow
Expand All @@ -23,8 +25,9 @@ import zio.interop.catz.*

import java.time.Instant
import java.util.UUID
import io.iohk.atala.castor.core.model.did.CanonicalPrismDID
import io.iohk.atala.castor.core.model.did.PrismDID

import org.postgresql.util.PSQLState
import java.sql.SQLException

// TODO: replace with actual implementation
class JdbcCredentialRepository(xa: Transactor[Task]) extends CredentialRepository[Task] {
Expand All @@ -47,8 +50,9 @@ class JdbcCredentialRepository(xa: Transactor[Task]) extends CredentialRepositor
given uuidGet: Get[UUID] = Get[String].map(UUID.fromString)
given uuidPut: Put[UUID] = Put[String].contramap(_.toString())

given instantGet: Get[Instant] = Get[Long].map(Instant.ofEpochSecond)
FabioPinheiro marked this conversation as resolved.
Show resolved Hide resolved
given instantPut: Put[Instant] = Put[Long].contramap(_.getEpochSecond())
// given instantGet: Get[Instant] = Get[Long].map(Instant.ofEpochSecond)
// given instantPut: Put[Instant] = Put[Long].contramap(_.getEpochSecond())
import doobie.postgres.implicits.JavaTimeInstantMeta

given protocolStateGet: Get[ProtocolState] = Get[String].map(ProtocolState.valueOf)
given protocolStatePut: Put[ProtocolState] = Put[String].contramap(_.toString)
Expand Down Expand Up @@ -93,7 +97,10 @@ class JdbcCredentialRepository(xa: Transactor[Task]) extends CredentialRepositor
| request_credential_data,
| issue_credential_data,
| issued_credential_raw,
| issuing_did
| issuing_did,
| meta_retries,
| meta_next_retry,
| meta_last_failure
| ) values (
| ${record.id},
| ${record.createdAt},
Expand All @@ -111,14 +118,17 @@ class JdbcCredentialRepository(xa: Transactor[Task]) extends CredentialRepositor
| ${record.requestCredentialData},
| ${record.issueCredentialData},
| ${record.issuedCredentialRaw},
| ${record.issuingDID}
| ${record.issuingDID},
| ${record.metaRetries},
| ${record.metaNextRetry},
| ${record.metaLastFailure}
| )
""".stripMargin.update

cxnIO.run
.transact(xa)
.mapError {
case e: PSQLException => UniqueConstraintViolation(e.getMessage())
case e: PSQLException => CredentialRepositoryError.fromPSQLException(e.getSQLState, e.getMessage)
case e => e
}
}
Expand All @@ -142,7 +152,10 @@ class JdbcCredentialRepository(xa: Transactor[Task]) extends CredentialRepositor
| request_credential_data,
| issue_credential_data,
| issued_credential_raw,
| issuing_did
| issuing_did,
| meta_retries,
| meta_next_retry,
| meta_last_failure
| FROM public.issue_credential_records
""".stripMargin
.query[IssueCredentialRecord]
Expand Down Expand Up @@ -179,7 +192,10 @@ class JdbcCredentialRepository(xa: Transactor[Task]) extends CredentialRepositor
| request_credential_data,
| issue_credential_data,
| issued_credential_raw,
| issuing_did
| issuing_did,
| meta_retries,
| meta_next_retry,
| meta_last_failure
| FROM public.issue_credential_records
| WHERE $inClauseFragment
""".stripMargin
Expand Down Expand Up @@ -209,7 +225,10 @@ class JdbcCredentialRepository(xa: Transactor[Task]) extends CredentialRepositor
| request_credential_data,
| issue_credential_data,
| issued_credential_raw,
| issuing_did
| issuing_did,
| meta_retries,
| meta_next_retry,
| meta_last_failure
| FROM public.issue_credential_records
| WHERE id = $recordId
""".stripMargin
Expand Down Expand Up @@ -239,7 +258,10 @@ class JdbcCredentialRepository(xa: Transactor[Task]) extends CredentialRepositor
| request_credential_data,
| issue_credential_data,
| issued_credential_raw,
| issuing_did
| issuing_did,
| meta_retries,
| meta_next_retry,
| meta_last_failure
| FROM public.issue_credential_records
| WHERE thid = $thid
""".stripMargin
Expand Down Expand Up @@ -409,6 +431,23 @@ class JdbcCredentialRepository(xa: Transactor[Task]) extends CredentialRepositor
cxnIO.run
.transact(xa)
}

def updateAfterFail(
recordId: UUID,
failReason: Option[String]
): Task[Int] = {
val cxnIO = sql"""
| UPDATE public.issue_credential_records
| SET
| meta_retries = meta_retries - 1,
| meta_next_retry = ${Instant.now().plusSeconds(60)},
| meta_last_failure = ${failReason}
| WHERE
| id = $recordId
""".stripMargin.update
// FIXME meta_next_retry = ${if (record.metaRetries - 1 == 0) None else Some(Instant.now().plusSeconds(60))} // TODO exponention time
cxnIO.run.transact(xa)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have not you implemented meta_next_retry in an exponential way, the same way as in pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/CredentialRepositoryInMemory.scala ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainly because I am not good with SQL, and I forgot
I going to put this
meta_next_retry = CASE WHEN (meta_retries > 1) THEN ${Instant.now().plusSeconds(60)} ELSE null END,
but is still not exponential

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if Instant.now() is the string 2023-02-23T18:37:44.082381Z how can I add plus N mins on the SQL query?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you necessarily need to write one update query that does all the logic via SQL, I mean that would be ideal but, you can just make one SELECT statement, get values, calculate via regular scala code, and then run UPDATE with that value. Yes it is not ideal and we are making 2 queries instead of one but it is better than incorrect logic, and after all, if this becomes a performance issue we can figure it out later.

p.s

if Instant.now() is the string 2023-02-23T18:37:44.082381Z how can I add plus N mins on the SQL query?

I don't know, I'd have to google myself :p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer those operations to be atomic.
I was already bitten too many times when I was doing ETL of other databases.
I would pretty much invest time in giving the responsibility to the database. Instead of adding logic into the code to do that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to implement this logic via DB query that is fine with me (it is good actually). If you don't want to do it right now in this PR, then I'd ask to align this logic with InMemory implementation logic, because otherwise, inMemory has exponential increase and regular DB does not, I think that would be quite confusing for other devs who will work on this codebase.

Not having an exponential increase is fina for the first iteration, in my opinion, just want to make sure 2 implementations are aligned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @bvoiturier
I was thinking about how useful this CredentialRepositoryInMemory is?
I was going to propose to delete this. @bvoiturier what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create a ticket https://input-output.atlassian.net/browse/ATL-3686
In order to merge this

}

object JdbcCredentialRepository {
Expand Down
Loading