Skip to content

Commit

Permalink
Generate unique nonces for concurrent blockchain txns (#330)
Browse files Browse the repository at this point in the history
A single address can not publish multiple transactions to the chain
concurrently. If it does, we see this:
```
$ cargo run --package xdbg -- -d -b local generate -e identity -a 5
    Finished `dev` profile [unoptimized] target(s) in 0.20s
     Running `target/debug/xdbg -d -b local generate -e identity -a 5`
Error: 
   0: API error: API client error: mls error: status: Internal, message: "error publishing group message: rpc error: code = Internal desc = error publishing identity update: nonce too low", details: [], metadata: MetadataMap { headers: {"content-type": "application/grpc"} }
   1: API client error: mls error: status: Internal, message: "error publishing group message: rpc error: code = Internal desc = error publishing identity update: nonce too low", details: [], metadata: MetadataMap { headers: {"content-type": "application/grpc"} }
   2: mls error: status: Internal, message: "error publishing group message: rpc error: code = Internal desc = error publishing identity update: nonce too low", details: [], metadata: MetadataMap { headers: {"content-type": "application/grpc"} }
   3: status: Internal, message: "error publishing group message: rpc error: code = Internal desc = error publishing identity update: nonce too low", details: [], metadata: MetadataMap { headers: {"content-type": "application/grpc"} }

```

This introduces a lock to prevent concurrent/parallel transactions.

This won't work in a HA scenario as multiple payer processes can have
the same private key. The solution is to give a new key to every payer.
  • Loading branch information
mkysel authored Dec 18, 2024
1 parent e732d08 commit 951306a
Showing 1 changed file with 76 additions and 1 deletion.
77 changes: 76 additions & 1 deletion pkg/blockchain/blockchainPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package blockchain
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -23,6 +27,8 @@ type BlockchainPublisher struct {
messagesContract *abis.GroupMessages
identityUpdateContract *abis.IdentityUpdates
logger *zap.Logger
mutexNonce sync.Mutex
nonce *uint64
}

func NewBlockchainPublisher(
Expand All @@ -49,7 +55,6 @@ func NewBlockchainPublisher(
if err != nil {
return nil, err
}

return &BlockchainPublisher{
signer: signer,
logger: logger.Named("GroupBlockchainPublisher").
Expand All @@ -68,8 +73,15 @@ func (m *BlockchainPublisher) PublishGroupMessage(
if len(message) == 0 {
return nil, errors.New("message is empty")
}

nonce, err := m.fetchNonce(ctx)
if err != nil {
return nil, err
}

tx, err := m.messagesContract.AddMessage(&bind.TransactOpts{
Context: ctx,
Nonce: new(big.Int).SetUint64(nonce),
From: m.signer.FromAddress(),
Signer: m.signer.SignerFunc(),
}, groupID, message)
Expand Down Expand Up @@ -104,8 +116,15 @@ func (m *BlockchainPublisher) PublishIdentityUpdate(
if len(identityUpdate) == 0 {
return nil, errors.New("identity update is empty")
}

nonce, err := m.fetchNonce(ctx)
if err != nil {
return nil, err
}

tx, err := m.identityUpdateContract.AddIdentityUpdate(&bind.TransactOpts{
Context: ctx,
Nonce: new(big.Int).SetUint64(nonce),
From: m.signer.FromAddress(),
Signer: m.signer.SignerFunc(),
}, inboxId, identityUpdate)
Expand Down Expand Up @@ -135,6 +154,62 @@ func (m *BlockchainPublisher) PublishIdentityUpdate(
)
}

func (m *BlockchainPublisher) fetchNonce(ctx context.Context) (uint64, error) {
// NOTE:since pendingNonce starts at 0, and we have to return that value exactly,
// we can't easily use Once with unsigned integers
if m.nonce == nil {
m.mutexNonce.Lock()
defer m.mutexNonce.Unlock()
if m.nonce == nil {
// PendingNonceAt gives the next nonce that should be used
// if we are the first thread to initialize the nonce, we want to return PendingNonce+0
nonce, err := m.client.PendingNonceAt(ctx, m.signer.FromAddress())
if err != nil {
return 0, err
}
m.nonce = &nonce
m.logger.Info(fmt.Sprintf("Starting server with blockchain nonce: %d", *m.nonce))
return *m.nonce, nil
}
}
// Once the nonce has been initialized we can depend on Atomic to return the next value
next := atomic.AddUint64(m.nonce, 1)

pending, err := m.client.PendingNonceAt(ctx, m.signer.FromAddress())
if err != nil {
return 0, err
}

m.logger.Debug(
"Generated nonce",
zap.Uint64("pending_nonce", pending),
zap.Uint64("atomic_nonce", next),
)

if next >= pending {
// normal case scenario
return next, nil

}

// in some cases the chain nonce jumps ahead, and we need to handle this case
// this won't catch all possible timing scenarios, but it should self-heal if the chain jumps
m.mutexNonce.Lock()
defer m.mutexNonce.Unlock()
currentNonce := atomic.LoadUint64(m.nonce)
if currentNonce < pending {
m.logger.Info(
"Nonce skew detected",
zap.Uint64("pending_nonce", pending),
zap.Uint64("current_nonce", currentNonce),
)
atomic.StoreUint64(m.nonce, pending)
return pending, nil
}

return atomic.AddUint64(m.nonce, 1), nil
}

func findLog[T any](
receipt *types.Receipt,
parse func(types.Log) (*T, error),
Expand Down

0 comments on commit 951306a

Please sign in to comment.