Skip to content

Commit

Permalink
Merge pull request #92 from KenshiTech/remove-correctness-cache-86
Browse files Browse the repository at this point in the history
Get rid of consensus and aggregate cache
  • Loading branch information
pouya-eghbali authored Mar 27, 2024
2 parents 163fef1 + 08023d2 commit 7712412
Showing 1 changed file with 53 additions and 90 deletions.
143 changes: 53 additions & 90 deletions src/plugins/correctness/correctness.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,25 @@ package correctness

import (
"context"
"math/big"
"sync"
"time"

"github.com/KenshiTech/unchained/address"
"github.com/KenshiTech/unchained/config"
"github.com/KenshiTech/unchained/crypto/bls"
"github.com/KenshiTech/unchained/crypto/shake"
"github.com/KenshiTech/unchained/datasets"
"github.com/KenshiTech/unchained/db"
"github.com/KenshiTech/unchained/ent"
"github.com/KenshiTech/unchained/ent/correctnessreport"
"github.com/KenshiTech/unchained/ent/signer"
"github.com/KenshiTech/unchained/ethereum"
"github.com/KenshiTech/unchained/log"
"github.com/KenshiTech/unchained/pos"
"github.com/KenshiTech/unchained/utils"

bls12381 "github.com/consensys/gnark-crypto/ecc/bls12-381"
lru "github.com/hashicorp/golang-lru/v2"
)

type Key struct {
Topic [64]byte
Hash [64]byte
Correct bool
}

var consensus *lru.Cache[Key, map[bls12381.G1Affine]big.Int]
var signatureCache *lru.Cache[bls12381.G1Affine, []datasets.Signature]
var aggregateCache *lru.Cache[bls12381.G1Affine, bls12381.G1Affine]
var DebouncedSaveSignatures func(key bls12381.G1Affine, arg SaveSignatureArgs)
var signatureMutex *sync.Mutex
var supportedTopics map[[64]byte]bool
Expand Down Expand Up @@ -74,75 +63,32 @@ func RecordSignature(
signatureMutex.Lock()
defer signatureMutex.Unlock()

key := Key{
Topic: info.Topic,
Hash: info.Hash,
Correct: info.Correct,
}

if !consensus.Contains(key) {
consensus.Add(key, make(map[bls12381.G1Affine]big.Int))
}

consensusChain := config.Config.GetString("pos.chain")
blockNumber, err := GetBlockNumber(consensusChain)
signatures, ok := signatureCache.Get(hash)

if err != nil {
log.Logger.
With("Error", err).
Error("Failed to get the latest block number")
return
}

votingPower, err := pos.GetVotingPowerOfPublicKey(
signer.PublicKey,
big.NewInt(int64(*blockNumber)),
)

if err != nil {
log.Logger.
With("Address", address.Calculate(signer.PublicKey[:])).
With("Error", err).
Error("Failed to get voting power")
return
if !ok {
signatures = make([]datasets.Signature, 0)
}

reportedValues, _ := consensus.Get(key)
voted := reportedValues[hash]
totalVoted := new(big.Int).Add(votingPower, &voted)
isMajority := true

for _, reportCount := range reportedValues {
if reportCount.Cmp(totalVoted) == 1 {
isMajority = false
break
// Check for duplicates
for _, sig := range signatures {
if sig.Signer.PublicKey == signer.PublicKey {
return
}
}

cached, _ := signatureCache.Get(hash)

packed := datasets.Signature{
Signature: signature,
Signer: signer,
Processed: false,
}

for _, item := range cached {
if item.Signer.PublicKey == signer.PublicKey {
return
}
}

reportedValues[hash] = *totalVoted
cached = append(cached, packed)
signatureCache.Add(hash, cached)
signatures = append(signatures, packed)
signatureCache.Add(hash, signatures)

if isMajority {
if debounce {
DebouncedSaveSignatures(hash, SaveSignatureArgs{Hash: hash, Info: info})
} else {
SaveSignatures(SaveSignatureArgs{Hash: hash, Info: info})
}
if debounce {
DebouncedSaveSignatures(hash, SaveSignatureArgs{Hash: hash, Info: info})
} else {
SaveSignatures(SaveSignatureArgs{Hash: hash, Info: info})
}
}

Expand All @@ -163,14 +109,40 @@ func SaveSignatures(args SaveSignatureArgs) {
for i := range signatures {
signature := signatures[i]
keys = append(keys, signature.Signer.PublicKey[:])
if !signature.Processed {
newSignatures = append(newSignatures, signature.Signature)
newSigners = append(newSigners, signature.Signer)
}

currentRecord, err := dbClient.CorrectnessReport.
Query().
Where(correctnessreport.And(
correctnessreport.Hash(args.Info.Hash[:]),
correctnessreport.Topic(args.Info.Topic[:]),
correctnessreport.Timestamp(args.Info.Timestamp),
correctnessreport.Correct(args.Info.Correct),
)).
Only(ctx)

if err != nil {
panic(err)
}

// Select the new signers and signatures
for i := range signatures {
signature := signatures[i]

if currentRecord != nil {
for _, signer := range currentRecord.Edges.Signers {
if signature.Signer.PublicKey == [96]byte(signer.Key) {
continue
}
}
}

newSigners = append(newSigners, signature.Signer)
newSignatures = append(newSignatures, signature.Signature)
}

// TODO: This part can be a shared library
err := dbClient.Signer.MapCreateBulk(newSigners, func(sc *ent.SignerCreate, i int) {
err = dbClient.Signer.MapCreateBulk(newSigners, func(sc *ent.SignerCreate, i int) {
signer := newSigners[i]
sc.SetName(signer.Name).
SetEvm(signer.EvmWallet).
Expand Down Expand Up @@ -201,10 +173,15 @@ func SaveSignatures(args SaveSignatureArgs) {
}

var aggregate bls12381.G1Affine
currentAggregate, ok := aggregateCache.Get(args.Hash)

if ok {
newSignatures = append(newSignatures, currentAggregate)
if currentRecord != nil {
currentSignature, err := bls.RecoverSignature([48]byte(currentRecord.Signature))

if err != nil {
panic(err)
}

newSignatures = append(newSignatures, currentSignature)
}

aggregate, err = bls.AggregateSignatures(newSignatures)
Expand Down Expand Up @@ -232,11 +209,7 @@ func SaveSignatures(args SaveSignatureArgs) {
panic(err)
}

for inx := range signatures {
signatures[inx].Processed = true
}

aggregateCache.Add(args.Hash, aggregate)
signatureCache.Remove(args.Hash)
}

func Setup() {
Expand Down Expand Up @@ -265,14 +238,4 @@ func init() {
if err != nil {
panic(err)
}

consensus, err = lru.New[Key, map[bls12381.G1Affine]big.Int](LruSize)
if err != nil {
panic(err)
}

aggregateCache, err = lru.New[bls12381.G1Affine, bls12381.G1Affine](LruSize)
if err != nil {
panic(err)
}
}

0 comments on commit 7712412

Please sign in to comment.