Skip to content

Commit

Permalink
X-chain SDK gossip (#2490)
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <[email protected]>
Signed-off-by: Stephen Buttolph <[email protected]>
Co-authored-by: Stephen Buttolph <[email protected]>
Co-authored-by: Dhruba Basu <[email protected]>
  • Loading branch information
3 people authored Dec 22, 2023
1 parent 5888ac3 commit 33f7411
Show file tree
Hide file tree
Showing 11 changed files with 739 additions and 141 deletions.
34 changes: 34 additions & 0 deletions vms/avm/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package avm

import (
"encoding/json"

"github.com/ava-labs/avalanchego/vms/avm/network"
)

var DefaultConfig = Config{
Network: network.DefaultConfig,
IndexTransactions: false,
IndexAllowIncomplete: false,
ChecksumsEnabled: false,
}

type Config struct {
Network network.Config `json:"network"`
IndexTransactions bool `json:"index-transactions"`
IndexAllowIncomplete bool `json:"index-allow-incomplete"`
ChecksumsEnabled bool `json:"checksums-enabled"`
}

func ParseConfig(configBytes []byte) (Config, error) {
if len(configBytes) == 0 {
return DefaultConfig, nil
}

config := DefaultConfig
err := json.Unmarshal(configBytes, &config)
return config, err
}
67 changes: 67 additions & 0 deletions vms/avm/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package avm

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/vms/avm/network"
)

func TestParseConfig(t *testing.T) {
tests := []struct {
name string
configBytes []byte
expectedConfig Config
}{
{
name: "unspecified config",
configBytes: nil,
expectedConfig: DefaultConfig,
},
{
name: "manually specified checksums enabled",
configBytes: []byte(`{"checksums-enabled":true}`),
expectedConfig: Config{
Network: network.DefaultConfig,
IndexTransactions: DefaultConfig.IndexTransactions,
IndexAllowIncomplete: DefaultConfig.IndexAllowIncomplete,
ChecksumsEnabled: true,
},
},
{
name: "manually specified checksums enabled",
configBytes: []byte(`{"network":{"max-validator-set-staleness":1}}`),
expectedConfig: Config{
Network: network.Config{
MaxValidatorSetStaleness: time.Nanosecond,
TargetGossipSize: network.DefaultConfig.TargetGossipSize,
PullGossipPollSize: network.DefaultConfig.PullGossipPollSize,
PullGossipFrequency: network.DefaultConfig.PullGossipFrequency,
PullGossipThrottlingPeriod: network.DefaultConfig.PullGossipThrottlingPeriod,
PullGossipThrottlingLimit: network.DefaultConfig.PullGossipThrottlingLimit,
ExpectedBloomFilterElements: network.DefaultConfig.ExpectedBloomFilterElements,
ExpectedBloomFilterFalsePositiveProbability: network.DefaultConfig.ExpectedBloomFilterFalsePositiveProbability,
MaxBloomFilterFalsePositiveProbability: network.DefaultConfig.MaxBloomFilterFalsePositiveProbability,
LegacyPushGossipCacheSize: network.DefaultConfig.LegacyPushGossipCacheSize,
},
IndexTransactions: DefaultConfig.IndexTransactions,
IndexAllowIncomplete: DefaultConfig.IndexAllowIncomplete,
ChecksumsEnabled: DefaultConfig.ChecksumsEnabled,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
require := require.New(t)

config, err := ParseConfig(test.configBytes)
require.NoError(err)
require.Equal(test.expectedConfig, config)
})
}
}
5 changes: 2 additions & 3 deletions vms/avm/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ func setup(tb testing.TB, c *envConfig) *environment {
Config: vmStaticConfig,
}

vmDynamicConfig := Config{
IndexTransactions: true,
}
vmDynamicConfig := DefaultConfig
vmDynamicConfig.IndexTransactions = true
if c.vmDynamicConfig != nil {
vmDynamicConfig = *c.vmDynamicConfig
}
Expand Down
66 changes: 66 additions & 0 deletions vms/avm/network/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package network

import (
"time"

"github.com/ava-labs/avalanchego/utils/units"
)

var DefaultConfig = Config{
MaxValidatorSetStaleness: time.Minute,
TargetGossipSize: 20 * units.KiB,
PullGossipPollSize: 1,
PullGossipFrequency: 1500 * time.Millisecond,
PullGossipThrottlingPeriod: 10 * time.Second,
PullGossipThrottlingLimit: 2,
ExpectedBloomFilterElements: 8 * 1024,
ExpectedBloomFilterFalsePositiveProbability: .01,
MaxBloomFilterFalsePositiveProbability: .05,
LegacyPushGossipCacheSize: 512,
}

type Config struct {
// MaxValidatorSetStaleness limits how old of a validator set the network
// will use for peer sampling and rate limiting.
MaxValidatorSetStaleness time.Duration `json:"max-validator-set-staleness"`
// TargetGossipSize is the number of bytes that will be attempted to be
// sent when pushing transactions and when responded to transaction pull
// requests.
TargetGossipSize int `json:"target-gossip-size"`
// PullGossipPollSize is the number of validators to sample when performing
// a round of pull gossip.
PullGossipPollSize int `json:"pull-gossip-poll-size"`
// PullGossipFrequency is how frequently rounds of pull gossip are
// performed.
PullGossipFrequency time.Duration `json:"pull-gossip-frequency"`
// PullGossipThrottlingPeriod is how large of a window the throttler should
// use.
PullGossipThrottlingPeriod time.Duration `json:"pull-gossip-throttling-period"`
// PullGossipThrottlingLimit is the number of pull querys that are allowed
// by a validator in every throttling window.
PullGossipThrottlingLimit int `json:"pull-gossip-throttling-limit"`
// ExpectedBloomFilterElements is the number of elements to expect when
// creating a new bloom filter. The larger this number is, the larger the
// bloom filter will be.
ExpectedBloomFilterElements uint64 `json:"expected-bloom-filter-elements"`
// ExpectedBloomFilterFalsePositiveProbability is the expected probability
// of a false positive after having inserted ExpectedBloomFilterElements
// into a bloom filter. The smaller this number is, the larger the bloom
// filter will be.
ExpectedBloomFilterFalsePositiveProbability float64 `json:"expected-bloom-filter-false-positive-probability"`
// MaxBloomFilterFalsePositiveProbability is used to determine when the
// bloom filter should be refreshed. Once the expected probability of a
// false positive exceeds this value, the bloom filter will be regenerated.
// The smaller this number is, the more frequently that the bloom filter
// will be regenerated.
MaxBloomFilterFalsePositiveProbability float64 `json:"max-bloom-filter-false-positive-probability"`
// LegacyPushGossipCacheSize tracks the most recently received transactions
// and ensures to only gossip them once.
//
// Deprecated: The legacy push gossip mechanism is deprecated in favor of
// the p2p SDK's push gossip mechanism.
LegacyPushGossipCacheSize int `json:"legacy-push-gossip-cache-size"`
}
157 changes: 157 additions & 0 deletions vms/avm/network/gossip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package network

import (
"context"
"fmt"
"sync"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/vms/avm/txs"
"github.com/ava-labs/avalanchego/vms/avm/txs/mempool"
)

var (
_ p2p.Handler = (*txGossipHandler)(nil)
_ gossip.Set[*txs.Tx] = (*gossipMempool)(nil)
_ gossip.Marshaller[*txs.Tx] = (*txParser)(nil)
)

// txGossipHandler is the handler called when serving gossip messages
type txGossipHandler struct {
p2p.NoOpHandler
appGossipHandler p2p.Handler
appRequestHandler p2p.Handler
}

func (t txGossipHandler) AppGossip(
ctx context.Context,
nodeID ids.NodeID,
gossipBytes []byte,
) {
t.appGossipHandler.AppGossip(ctx, nodeID, gossipBytes)
}

func (t txGossipHandler) AppRequest(
ctx context.Context,
nodeID ids.NodeID,
deadline time.Time,
requestBytes []byte,
) ([]byte, error) {
return t.appRequestHandler.AppRequest(ctx, nodeID, deadline, requestBytes)
}

type txParser struct {
parser txs.Parser
}

func (*txParser) MarshalGossip(tx *txs.Tx) ([]byte, error) {
return tx.Bytes(), nil
}

func (g *txParser) UnmarshalGossip(bytes []byte) (*txs.Tx, error) {
return g.parser.ParseTx(bytes)
}

func newGossipMempool(
mempool mempool.Mempool,
log logging.Logger,
txVerifier TxVerifier,
parser txs.Parser,
maxExpectedElements uint64,
falsePositiveProbability,
maxFalsePositiveProbability float64,
) (*gossipMempool, error) {
bloom, err := gossip.NewBloomFilter(maxExpectedElements, falsePositiveProbability)
return &gossipMempool{
Mempool: mempool,
log: log,
txVerifier: txVerifier,
parser: parser,
maxFalsePositiveProbability: maxFalsePositiveProbability,
bloom: bloom,
}, err
}

type gossipMempool struct {
mempool.Mempool
log logging.Logger
txVerifier TxVerifier
parser txs.Parser
maxFalsePositiveProbability float64

lock sync.RWMutex
bloom *gossip.BloomFilter
}

// Add is called by the p2p SDK when handling transactions that were pushed to
// us and when handling transactions that were pulled from a peer. If this
// returns a nil error while handling push gossip, the p2p SDK will queue the
// transaction to push gossip as well.
func (g *gossipMempool) Add(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID)
}

if reason := g.Mempool.GetDropReason(txID); reason != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
return reason
}

// Verify the tx at the currently preferred state
if err := g.txVerifier.VerifyTx(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

return g.AddVerified(tx)
}

func (g *gossipMempool) AddVerified(tx *txs.Tx) error {
if err := g.Mempool.Add(tx); err != nil {
g.Mempool.MarkDropped(tx.ID(), err)
return err
}

g.lock.Lock()
defer g.lock.Unlock()

g.bloom.Add(tx)
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.maxFalsePositiveProbability)
if err != nil {
return err
}

if reset {
g.log.Debug("resetting bloom filter")
g.Mempool.Iterate(func(tx *txs.Tx) bool {
g.bloom.Add(tx)
return true
})
}

g.Mempool.RequestBuildBlock()
return nil
}

func (g *gossipMempool) Iterate(f func(*txs.Tx) bool) {
g.Mempool.Iterate(f)
}

func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte, err error) {
g.lock.RLock()
defer g.lock.RUnlock()

bloomBytes, err := g.bloom.Bloom.MarshalBinary()
return bloomBytes, g.bloom.Salt[:], err
}
Loading

0 comments on commit 33f7411

Please sign in to comment.