Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of consensus and aggregate cache #92

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 53 additions & 90 deletions src/plugins/correctness/correctness.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,25 @@ package correctness

import (
"context"
"math/big"
"sync"
"time"

"github.com/KenshiTech/unchained/address"
"github.com/KenshiTech/unchained/config"
"github.com/KenshiTech/unchained/crypto/bls"
"github.com/KenshiTech/unchained/crypto/shake"
"github.com/KenshiTech/unchained/datasets"
"github.com/KenshiTech/unchained/db"
"github.com/KenshiTech/unchained/ent"
"github.com/KenshiTech/unchained/ent/correctnessreport"
"github.com/KenshiTech/unchained/ent/signer"
"github.com/KenshiTech/unchained/ethereum"
"github.com/KenshiTech/unchained/log"
"github.com/KenshiTech/unchained/pos"
"github.com/KenshiTech/unchained/utils"

bls12381 "github.com/consensys/gnark-crypto/ecc/bls12-381"
lru "github.com/hashicorp/golang-lru/v2"
)

type Key struct {
Topic [64]byte
Hash [64]byte
Correct bool
}

var consensus *lru.Cache[Key, map[bls12381.G1Affine]big.Int]
var signatureCache *lru.Cache[bls12381.G1Affine, []datasets.Signature]
var aggregateCache *lru.Cache[bls12381.G1Affine, bls12381.G1Affine]
var DebouncedSaveSignatures func(key bls12381.G1Affine, arg SaveSignatureArgs)
var signatureMutex *sync.Mutex
var supportedTopics map[[64]byte]bool
Expand Down Expand Up @@ -74,75 +63,32 @@ func RecordSignature(
signatureMutex.Lock()
defer signatureMutex.Unlock()

key := Key{
Topic: info.Topic,
Hash: info.Hash,
Correct: info.Correct,
}

if !consensus.Contains(key) {
consensus.Add(key, make(map[bls12381.G1Affine]big.Int))
}

consensusChain := config.Config.GetString("pos.chain")
blockNumber, err := GetBlockNumber(consensusChain)
signatures, ok := signatureCache.Get(hash)

if err != nil {
log.Logger.
With("Error", err).
Error("Failed to get the latest block number")
return
}

votingPower, err := pos.GetVotingPowerOfPublicKey(
signer.PublicKey,
big.NewInt(int64(*blockNumber)),
)

if err != nil {
log.Logger.
With("Address", address.Calculate(signer.PublicKey[:])).
With("Error", err).
Error("Failed to get voting power")
return
if !ok {
signatures = make([]datasets.Signature, 0)
}

reportedValues, _ := consensus.Get(key)
voted := reportedValues[hash]
totalVoted := new(big.Int).Add(votingPower, &voted)
isMajority := true

for _, reportCount := range reportedValues {
if reportCount.Cmp(totalVoted) == 1 {
isMajority = false
break
// Check for duplicates
for _, sig := range signatures {
if sig.Signer.PublicKey == signer.PublicKey {
return
}
}

cached, _ := signatureCache.Get(hash)

packed := datasets.Signature{
Signature: signature,
Signer: signer,
Processed: false,
}

for _, item := range cached {
if item.Signer.PublicKey == signer.PublicKey {
return
}
}

reportedValues[hash] = *totalVoted
cached = append(cached, packed)
signatureCache.Add(hash, cached)
signatures = append(signatures, packed)
signatureCache.Add(hash, signatures)

if isMajority {
if debounce {
DebouncedSaveSignatures(hash, SaveSignatureArgs{Hash: hash, Info: info})
} else {
SaveSignatures(SaveSignatureArgs{Hash: hash, Info: info})
}
if debounce {
DebouncedSaveSignatures(hash, SaveSignatureArgs{Hash: hash, Info: info})
} else {
SaveSignatures(SaveSignatureArgs{Hash: hash, Info: info})
}
}

Expand All @@ -163,14 +109,40 @@ func SaveSignatures(args SaveSignatureArgs) {
for i := range signatures {
signature := signatures[i]
keys = append(keys, signature.Signer.PublicKey[:])
if !signature.Processed {
newSignatures = append(newSignatures, signature.Signature)
newSigners = append(newSigners, signature.Signer)
}

currentRecord, err := dbClient.CorrectnessReport.
Query().
Where(correctnessreport.And(
correctnessreport.Hash(args.Info.Hash[:]),
correctnessreport.Topic(args.Info.Topic[:]),
correctnessreport.Timestamp(args.Info.Timestamp),
correctnessreport.Correct(args.Info.Correct),
)).
Only(ctx)

if err != nil {
panic(err)
}

// Select the new signers and signatures
for i := range signatures {
signature := signatures[i]

if currentRecord != nil {
for _, signer := range currentRecord.Edges.Signers {
if signature.Signer.PublicKey == [96]byte(signer.Key) {
continue
}
}
}

newSigners = append(newSigners, signature.Signer)
newSignatures = append(newSignatures, signature.Signature)
}

// TODO: This part can be a shared library
err := dbClient.Signer.MapCreateBulk(newSigners, func(sc *ent.SignerCreate, i int) {
err = dbClient.Signer.MapCreateBulk(newSigners, func(sc *ent.SignerCreate, i int) {
signer := newSigners[i]
sc.SetName(signer.Name).
SetEvm(signer.EvmWallet).
Expand Down Expand Up @@ -201,10 +173,15 @@ func SaveSignatures(args SaveSignatureArgs) {
}

var aggregate bls12381.G1Affine
currentAggregate, ok := aggregateCache.Get(args.Hash)

if ok {
newSignatures = append(newSignatures, currentAggregate)
if currentRecord != nil {
currentSignature, err := bls.RecoverSignature([48]byte(currentRecord.Signature))

if err != nil {
panic(err)
}

newSignatures = append(newSignatures, currentSignature)
}

aggregate, err = bls.AggregateSignatures(newSignatures)
Expand Down Expand Up @@ -232,11 +209,7 @@ func SaveSignatures(args SaveSignatureArgs) {
panic(err)
}

for inx := range signatures {
signatures[inx].Processed = true
}

aggregateCache.Add(args.Hash, aggregate)
signatureCache.Remove(args.Hash)
}

func Setup() {
Expand Down Expand Up @@ -265,14 +238,4 @@ func init() {
if err != nil {
panic(err)
}

consensus, err = lru.New[Key, map[bls12381.G1Affine]big.Int](LruSize)
if err != nil {
panic(err)
}

aggregateCache, err = lru.New[bls12381.G1Affine, bls12381.G1Affine](LruSize)
if err != nil {
panic(err)
}
}
Loading