Skip to content

Commit

Permalink
blocksync for leader
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Dec 2, 2024
1 parent 0ff2995 commit 5446e41
Show file tree
Hide file tree
Showing 14 changed files with 512 additions and 188 deletions.
13 changes: 7 additions & 6 deletions node/accounts/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package accounts

import (
"encoding/hex"
"errors"
"fmt"
"math/big"
)

var (
ErrInsufficientFunds = fmt.Errorf("insufficient funds")
ErrConvertToBigInt = fmt.Errorf("could not convert to big int")
ErrInvalidNonce = fmt.Errorf("invalid nonce")
ErrAccountNotFound = fmt.Errorf("account not found")
ErrNegativeBalance = fmt.Errorf("negative balance not permitted")
ErrNegativeTransfer = fmt.Errorf("negative transfer not permitted")
ErrInsufficientFunds = errors.New("insufficient funds")
ErrConvertToBigInt = errors.New("could not convert to big int")
ErrInvalidNonce = errors.New("invalid nonce")
ErrAccountNotFound = errors.New("account not found")
ErrNegativeBalance = errors.New("negative balance not permitted")
ErrNegativeTransfer = errors.New("negative transfer not permitted")
)

// errInsufficientFunds formats an error message for insufficient funds
Expand Down
7 changes: 4 additions & 3 deletions node/accounts/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package accounts

import (
"context"
"errors"
"fmt"
"math/big"

Expand All @@ -22,7 +23,7 @@ const (

sqlCreateAccount = `INSERT INTO ` + schemaName + `.accounts (identifier, balance, nonce) VALUES ($1, $2, $3)`

sqlCreateAccountIfNotExists = `INSERT INTO ` + schemaName + `.accounts (identifier, balance, nonce) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING`
// sqlCreateAccountIfNotExists = `INSERT INTO ` + schemaName + `.accounts (identifier, balance, nonce) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING`

sqlUpdateAccount = `UPDATE ` + schemaName + `.accounts SET balance = $1, nonce = $2
WHERE identifier = $3`
Expand Down Expand Up @@ -69,7 +70,7 @@ func getAccount(ctx context.Context, db sql.Executor, ident []byte) (*types.Acco

stringBal, ok := results.Rows[0][0].(string)
if !ok {
return nil, fmt.Errorf("failed to convert stored string balance to big int")
return nil, errors.New("failed to convert stored string balance to big int")
}

balance, ok := new(big.Int).SetString(stringBal, 10)
Expand All @@ -79,7 +80,7 @@ func getAccount(ctx context.Context, db sql.Executor, ident []byte) (*types.Acco

nonce, ok := results.Rows[0][1].(int64)
if !ok {
return nil, fmt.Errorf("failed to convert stored nonce to int64")
return nil, errors.New("failed to convert stored nonce to int64")
}

return &types.Account{
Expand Down
192 changes: 152 additions & 40 deletions node/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
Expand All @@ -20,6 +21,8 @@ import (
type (
ConsensusReset = types.ConsensusReset
AckRes = types.AckRes
DiscReq = types.DiscoveryRequest
DiscRes = types.DiscoveryResponse
)

type blockProp struct {
Expand Down Expand Up @@ -324,7 +327,7 @@ func (n *Node) startAckGossip(ctx context.Context, ps *pubsub.PubSub) error {
}

// We're only interested if we are the leader.
if !n.ce.AcceptACK() {
if n.ce.Role() != types.RoleLeader {
// n.log.Debugln("discarding ack meant for leader")
continue // discard, we are just relaying to leader
}
Expand Down Expand Up @@ -358,56 +361,165 @@ func (n *Node) startAckGossip(ctx context.Context, ps *pubsub.PubSub) error {
return nil
}

/* commented because we're probably going with gossipsub
func (n *Node) blkAckStreamHandler(s network.Stream) {
defer s.Close()
func (n *Node) sendDiscoveryRequest() {
n.log.Info("sending Discovery request")
n.discReq <- types.DiscoveryRequest{}
}

if !n.leader.Load() {
return
}
func (n *Node) sendDiscoveryResponse(bestHeight int64) {
n.log.Info("sending Discovery response", "height", bestHeight)
n.discResp <- types.DiscoveryResponse{BestHeight: bestHeight}
}

// "ack:blkid:appHash" // empty appHash means NACK
ackMsg := make([]byte, 128)
nr, err := s.Read(ackMsg)
func (n *Node) startDiscoveryRequestGossip(ctx context.Context, ps *pubsub.PubSub) error {
topicDisc, subDisc, err := subTopic(ctx, ps, TopicDiscReq)
if err != nil {
n.log.Infof("failed to read block proposal ID:", err)
return
}
blkAck, ok := bytes.CutPrefix(ackMsg[:nr], []byte(ackMsg))
if !ok {
n.log.Infof("bad block proposal ID:", ackMsg)
return
}
blkID, appHashStr, ok := strings.Cut(string(blkAck), ":")
if !ok {
n.log.Infof("bad block proposal ID:", blkAck)
return
return err
}

blkHash, err := types.NewHashFromString(blkID)
if err != nil {
n.log.Infof("bad block ID in ack msg:", err)
return
}
isNACK := len(appHashStr) == 0
if isNACK {
// do something
n.log.Infof("got nACK for block %v", blkHash)
return
}
subCanceled := make(chan struct{})
n.log.Info("starting Discovery request gossip")

n.wg.Add(1)
go func() {
defer func() {
<-subCanceled
topicDisc.Close()
n.wg.Done()
}()
for {
select {
case <-ctx.Done():
return
case <-n.discReq:
n.log.Debugln("publishing Discovery request")
err := topicDisc.Publish(ctx, nil)
if err != nil {
n.log.Warnf("Publish Discovery request failure: %v", err)
return
}
}
}
}()

me := n.host.ID()

go func() {
defer close(subCanceled)
defer subDisc.Cancel()
for {
discMsg, err := subDisc.Next(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
n.log.Infof("subTx.Next:", err)
}
return
}

n.log.Infof("received Discovery request from %s (rcvd from %s)", hex.EncodeToString(discMsg.From), discMsg.ReceivedFrom.String())

// We're only interested if we are the validator.
if n.ce.Role() != types.RoleValidator {
continue // discard, we are just relaying to leader
}

if peer.ID(discMsg.From) == me {
// n.log.Infof("ACK message from me ignored")
continue
}

// Check the block store for the best height and respond
bestHeight, _, _ := n.bki.Best()
n.sendDiscoveryResponse(bestHeight)

n.log.Info("responded to Discovery request", "height", bestHeight)
}
}()

return nil
}

appHash, err := types.NewHashFromString(appHashStr)
func (n *Node) startDiscoveryResponseGossip(ctx context.Context, ps *pubsub.PubSub) error {
topicDisc, subDisc, err := subTopic(ctx, ps, TopicDiscResp)
if err != nil {
n.log.Infof("bad block ID in ack msg:", err)
return
return err
}

// as leader, we tally the responses
n.log.Infof("got ACK for block %v, app hash %v", blkHash, appHash)
subCanceled := make(chan struct{})

n.log.Info("starting Discovery response gossip")

n.wg.Add(1)
go func() {
defer func() {
<-subCanceled
topicDisc.Close()
n.wg.Done()
}()
for {
select {
case <-ctx.Done():
return
case msg := <-n.discResp:
n.log.Debugln("publishing Discovery Response message", msg.BestHeight)
discMsg, _ := msg.MarshalBinary()
err := topicDisc.Publish(ctx, discMsg)
if err != nil {
n.log.Warnf("Publish Discovery resp failure (%v): %v", msg.BestHeight, err)
return
}
}

}
}()

me := n.host.ID()

return
go func() {
defer close(subCanceled)
defer subDisc.Cancel()
for {
discMsg, err := subDisc.Next(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
n.log.Infof("subTx.Next:", err)
}
return
}

// We're only interested if we are the leader.
if n.ce.Role() != types.RoleLeader {
continue // discard, we are just relaying to leader
}

if peer.ID(discMsg.From) == me {
// n.log.Infof("ACK message from me ignored")
continue
}

var dm DiscRes
err = dm.UnmarshalBinary(discMsg.Data)
if err != nil {
n.log.Infof("failed to decode Discovery msg: %v", err)
continue
}
fromPeerID := discMsg.GetFrom()

n.log.Infof("received Discovery response msg from %s (rcvd from %s), data = %d",
fromPeerID.String(), discMsg.ReceivedFrom.String(), dm.BestHeight)

peerPubKey, err := fromPeerID.ExtractPublicKey()
if err != nil {
n.log.Infof("failed to extract pubkey from peer ID %v: %v", fromPeerID, err)
continue
}
pubkeyBytes, _ := peerPubKey.Raw() // does not error for secp256k1 or ed25519
go n.ce.NotifyDiscoveryMessage(pubkeyBytes, dm.BestHeight)
}
}()

return nil
}
*/

func (n *Node) sendReset(height int64) error {
n.resetMsg <- types.ConsensusReset{
Expand Down
Loading

0 comments on commit 5446e41

Please sign in to comment.