From 33f74116aafe961a9c72d1f3594573151c6c1ea6 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Fri, 22 Dec 2023 16:21:23 -0500 Subject: [PATCH] X-chain SDK gossip (#2490) Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Signed-off-by: Stephen Buttolph Co-authored-by: Stephen Buttolph Co-authored-by: Dhruba Basu <7675102+dhrubabasu@users.noreply.github.com> --- vms/avm/config.go | 34 ++++ vms/avm/config_test.go | 67 ++++++++ vms/avm/environment_test.go | 5 +- vms/avm/network/config.go | 66 ++++++++ vms/avm/network/gossip.go | 157 +++++++++++++++++++ vms/avm/network/gossip_test.go | 128 +++++++++++++++ vms/avm/network/network.go | 268 +++++++++++++++++++++----------- vms/avm/network/network_test.go | 48 ++++-- vms/avm/txs/tx.go | 8 + vms/avm/vm.go | 94 +++++++---- vms/avm/vm_test.go | 5 +- 11 files changed, 739 insertions(+), 141 deletions(-) create mode 100644 vms/avm/config.go create mode 100644 vms/avm/config_test.go create mode 100644 vms/avm/network/config.go create mode 100644 vms/avm/network/gossip.go create mode 100644 vms/avm/network/gossip_test.go diff --git a/vms/avm/config.go b/vms/avm/config.go new file mode 100644 index 000000000000..75f0194b15a2 --- /dev/null +++ b/vms/avm/config.go @@ -0,0 +1,34 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package avm + +import ( + "encoding/json" + + "github.com/ava-labs/avalanchego/vms/avm/network" +) + +var DefaultConfig = Config{ + Network: network.DefaultConfig, + IndexTransactions: false, + IndexAllowIncomplete: false, + ChecksumsEnabled: false, +} + +type Config struct { + Network network.Config `json:"network"` + IndexTransactions bool `json:"index-transactions"` + IndexAllowIncomplete bool `json:"index-allow-incomplete"` + ChecksumsEnabled bool `json:"checksums-enabled"` +} + +func ParseConfig(configBytes []byte) (Config, error) { + if len(configBytes) == 0 { + return DefaultConfig, nil + } + + config := DefaultConfig + err := json.Unmarshal(configBytes, &config) + return config, err +} diff --git a/vms/avm/config_test.go b/vms/avm/config_test.go new file mode 100644 index 000000000000..e11115028fd6 --- /dev/null +++ b/vms/avm/config_test.go @@ -0,0 +1,67 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package avm + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/vms/avm/network" +) + +func TestParseConfig(t *testing.T) { + tests := []struct { + name string + configBytes []byte + expectedConfig Config + }{ + { + name: "unspecified config", + configBytes: nil, + expectedConfig: DefaultConfig, + }, + { + name: "manually specified checksums enabled", + configBytes: []byte(`{"checksums-enabled":true}`), + expectedConfig: Config{ + Network: network.DefaultConfig, + IndexTransactions: DefaultConfig.IndexTransactions, + IndexAllowIncomplete: DefaultConfig.IndexAllowIncomplete, + ChecksumsEnabled: true, + }, + }, + { + name: "manually specified checksums enabled", + configBytes: []byte(`{"network":{"max-validator-set-staleness":1}}`), + expectedConfig: Config{ + Network: network.Config{ + MaxValidatorSetStaleness: time.Nanosecond, + TargetGossipSize: network.DefaultConfig.TargetGossipSize, + PullGossipPollSize: network.DefaultConfig.PullGossipPollSize, + PullGossipFrequency: network.DefaultConfig.PullGossipFrequency, + PullGossipThrottlingPeriod: network.DefaultConfig.PullGossipThrottlingPeriod, + PullGossipThrottlingLimit: network.DefaultConfig.PullGossipThrottlingLimit, + ExpectedBloomFilterElements: network.DefaultConfig.ExpectedBloomFilterElements, + ExpectedBloomFilterFalsePositiveProbability: network.DefaultConfig.ExpectedBloomFilterFalsePositiveProbability, + MaxBloomFilterFalsePositiveProbability: network.DefaultConfig.MaxBloomFilterFalsePositiveProbability, + LegacyPushGossipCacheSize: network.DefaultConfig.LegacyPushGossipCacheSize, + }, + IndexTransactions: DefaultConfig.IndexTransactions, + IndexAllowIncomplete: DefaultConfig.IndexAllowIncomplete, + ChecksumsEnabled: DefaultConfig.ChecksumsEnabled, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + config, err := ParseConfig(test.configBytes) + require.NoError(err) + require.Equal(test.expectedConfig, config) + }) + } +} diff --git a/vms/avm/environment_test.go b/vms/avm/environment_test.go index 5616371041ea..7811807521e0 100644 --- a/vms/avm/environment_test.go +++ b/vms/avm/environment_test.go @@ -157,9 +157,8 @@ func setup(tb testing.TB, c *envConfig) *environment { Config: vmStaticConfig, } - vmDynamicConfig := Config{ - IndexTransactions: true, - } + vmDynamicConfig := DefaultConfig + vmDynamicConfig.IndexTransactions = true if c.vmDynamicConfig != nil { vmDynamicConfig = *c.vmDynamicConfig } diff --git a/vms/avm/network/config.go b/vms/avm/network/config.go new file mode 100644 index 000000000000..2ff7828df2e4 --- /dev/null +++ b/vms/avm/network/config.go @@ -0,0 +1,66 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package network + +import ( + "time" + + "github.com/ava-labs/avalanchego/utils/units" +) + +var DefaultConfig = Config{ + MaxValidatorSetStaleness: time.Minute, + TargetGossipSize: 20 * units.KiB, + PullGossipPollSize: 1, + PullGossipFrequency: 1500 * time.Millisecond, + PullGossipThrottlingPeriod: 10 * time.Second, + PullGossipThrottlingLimit: 2, + ExpectedBloomFilterElements: 8 * 1024, + ExpectedBloomFilterFalsePositiveProbability: .01, + MaxBloomFilterFalsePositiveProbability: .05, + LegacyPushGossipCacheSize: 512, +} + +type Config struct { + // MaxValidatorSetStaleness limits how old of a validator set the network + // will use for peer sampling and rate limiting. + MaxValidatorSetStaleness time.Duration `json:"max-validator-set-staleness"` + // TargetGossipSize is the number of bytes that will be attempted to be + // sent when pushing transactions and when responded to transaction pull + // requests. + TargetGossipSize int `json:"target-gossip-size"` + // PullGossipPollSize is the number of validators to sample when performing + // a round of pull gossip. + PullGossipPollSize int `json:"pull-gossip-poll-size"` + // PullGossipFrequency is how frequently rounds of pull gossip are + // performed. + PullGossipFrequency time.Duration `json:"pull-gossip-frequency"` + // PullGossipThrottlingPeriod is how large of a window the throttler should + // use. + PullGossipThrottlingPeriod time.Duration `json:"pull-gossip-throttling-period"` + // PullGossipThrottlingLimit is the number of pull querys that are allowed + // by a validator in every throttling window. + PullGossipThrottlingLimit int `json:"pull-gossip-throttling-limit"` + // ExpectedBloomFilterElements is the number of elements to expect when + // creating a new bloom filter. The larger this number is, the larger the + // bloom filter will be. + ExpectedBloomFilterElements uint64 `json:"expected-bloom-filter-elements"` + // ExpectedBloomFilterFalsePositiveProbability is the expected probability + // of a false positive after having inserted ExpectedBloomFilterElements + // into a bloom filter. The smaller this number is, the larger the bloom + // filter will be. + ExpectedBloomFilterFalsePositiveProbability float64 `json:"expected-bloom-filter-false-positive-probability"` + // MaxBloomFilterFalsePositiveProbability is used to determine when the + // bloom filter should be refreshed. Once the expected probability of a + // false positive exceeds this value, the bloom filter will be regenerated. + // The smaller this number is, the more frequently that the bloom filter + // will be regenerated. + MaxBloomFilterFalsePositiveProbability float64 `json:"max-bloom-filter-false-positive-probability"` + // LegacyPushGossipCacheSize tracks the most recently received transactions + // and ensures to only gossip them once. + // + // Deprecated: The legacy push gossip mechanism is deprecated in favor of + // the p2p SDK's push gossip mechanism. + LegacyPushGossipCacheSize int `json:"legacy-push-gossip-cache-size"` +} diff --git a/vms/avm/network/gossip.go b/vms/avm/network/gossip.go new file mode 100644 index 000000000000..e4e145d830eb --- /dev/null +++ b/vms/avm/network/gossip.go @@ -0,0 +1,157 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package network + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms/avm/txs" + "github.com/ava-labs/avalanchego/vms/avm/txs/mempool" +) + +var ( + _ p2p.Handler = (*txGossipHandler)(nil) + _ gossip.Set[*txs.Tx] = (*gossipMempool)(nil) + _ gossip.Marshaller[*txs.Tx] = (*txParser)(nil) +) + +// txGossipHandler is the handler called when serving gossip messages +type txGossipHandler struct { + p2p.NoOpHandler + appGossipHandler p2p.Handler + appRequestHandler p2p.Handler +} + +func (t txGossipHandler) AppGossip( + ctx context.Context, + nodeID ids.NodeID, + gossipBytes []byte, +) { + t.appGossipHandler.AppGossip(ctx, nodeID, gossipBytes) +} + +func (t txGossipHandler) AppRequest( + ctx context.Context, + nodeID ids.NodeID, + deadline time.Time, + requestBytes []byte, +) ([]byte, error) { + return t.appRequestHandler.AppRequest(ctx, nodeID, deadline, requestBytes) +} + +type txParser struct { + parser txs.Parser +} + +func (*txParser) MarshalGossip(tx *txs.Tx) ([]byte, error) { + return tx.Bytes(), nil +} + +func (g *txParser) UnmarshalGossip(bytes []byte) (*txs.Tx, error) { + return g.parser.ParseTx(bytes) +} + +func newGossipMempool( + mempool mempool.Mempool, + log logging.Logger, + txVerifier TxVerifier, + parser txs.Parser, + maxExpectedElements uint64, + falsePositiveProbability, + maxFalsePositiveProbability float64, +) (*gossipMempool, error) { + bloom, err := gossip.NewBloomFilter(maxExpectedElements, falsePositiveProbability) + return &gossipMempool{ + Mempool: mempool, + log: log, + txVerifier: txVerifier, + parser: parser, + maxFalsePositiveProbability: maxFalsePositiveProbability, + bloom: bloom, + }, err +} + +type gossipMempool struct { + mempool.Mempool + log logging.Logger + txVerifier TxVerifier + parser txs.Parser + maxFalsePositiveProbability float64 + + lock sync.RWMutex + bloom *gossip.BloomFilter +} + +// Add is called by the p2p SDK when handling transactions that were pushed to +// us and when handling transactions that were pulled from a peer. If this +// returns a nil error while handling push gossip, the p2p SDK will queue the +// transaction to push gossip as well. +func (g *gossipMempool) Add(tx *txs.Tx) error { + txID := tx.ID() + if _, ok := g.Mempool.Get(txID); ok { + return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID) + } + + if reason := g.Mempool.GetDropReason(txID); reason != nil { + // If the tx is being dropped - just ignore it + // + // TODO: Should we allow re-verification of the transaction even if it + // failed previously? + return reason + } + + // Verify the tx at the currently preferred state + if err := g.txVerifier.VerifyTx(tx); err != nil { + g.Mempool.MarkDropped(txID, err) + return err + } + + return g.AddVerified(tx) +} + +func (g *gossipMempool) AddVerified(tx *txs.Tx) error { + if err := g.Mempool.Add(tx); err != nil { + g.Mempool.MarkDropped(tx.ID(), err) + return err + } + + g.lock.Lock() + defer g.lock.Unlock() + + g.bloom.Add(tx) + reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.maxFalsePositiveProbability) + if err != nil { + return err + } + + if reset { + g.log.Debug("resetting bloom filter") + g.Mempool.Iterate(func(tx *txs.Tx) bool { + g.bloom.Add(tx) + return true + }) + } + + g.Mempool.RequestBuildBlock() + return nil +} + +func (g *gossipMempool) Iterate(f func(*txs.Tx) bool) { + g.Mempool.Iterate(f) +} + +func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte, err error) { + g.lock.RLock() + defer g.lock.RUnlock() + + bloomBytes, err := g.bloom.Bloom.MarshalBinary() + return bloomBytes, g.bloom.Salt[:], err +} diff --git a/vms/avm/network/gossip_test.go b/vms/avm/network/gossip_test.go new file mode 100644 index 000000000000..a3922a23d812 --- /dev/null +++ b/vms/avm/network/gossip_test.go @@ -0,0 +1,128 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package network + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms/avm/fxs" + "github.com/ava-labs/avalanchego/vms/avm/txs" + "github.com/ava-labs/avalanchego/vms/avm/txs/mempool" + "github.com/ava-labs/avalanchego/vms/components/avax" + "github.com/ava-labs/avalanchego/vms/secp256k1fx" +) + +var _ TxVerifier = (*testVerifier)(nil) + +type testVerifier struct { + err error +} + +func (v testVerifier) VerifyTx(*txs.Tx) error { + return v.err +} + +func TestMarshaller(t *testing.T) { + require := require.New(t) + + parser, err := txs.NewParser([]fxs.Fx{ + &secp256k1fx.Fx{}, + }) + require.NoError(err) + + marhsaller := txParser{ + parser: parser, + } + + want := &txs.Tx{Unsigned: &txs.BaseTx{}} + require.NoError(want.Initialize(parser.Codec())) + + bytes, err := marhsaller.MarshalGossip(want) + require.NoError(err) + + got, err := marhsaller.UnmarshalGossip(bytes) + require.NoError(err) + require.Equal(want.GossipID(), got.GossipID()) +} + +func TestGossipMempoolAdd(t *testing.T) { + require := require.New(t) + + metrics := prometheus.NewRegistry() + toEngine := make(chan common.Message, 1) + + baseMempool, err := mempool.New("", metrics, toEngine) + require.NoError(err) + + parser, err := txs.NewParser(nil) + require.NoError(err) + + mempool, err := newGossipMempool( + baseMempool, + logging.NoLog{}, + testVerifier{}, + parser, + DefaultConfig.ExpectedBloomFilterElements, + DefaultConfig.ExpectedBloomFilterFalsePositiveProbability, + DefaultConfig.MaxBloomFilterFalsePositiveProbability, + ) + require.NoError(err) + + tx := &txs.Tx{ + Unsigned: &txs.BaseTx{ + BaseTx: avax.BaseTx{ + Ins: []*avax.TransferableInput{}, + }, + }, + TxID: ids.GenerateTestID(), + } + + require.NoError(mempool.Add(tx)) + require.True(mempool.bloom.Has(tx)) +} + +func TestGossipMempoolAddVerified(t *testing.T) { + require := require.New(t) + + metrics := prometheus.NewRegistry() + toEngine := make(chan common.Message, 1) + + baseMempool, err := mempool.New("", metrics, toEngine) + require.NoError(err) + + parser, err := txs.NewParser(nil) + require.NoError(err) + + mempool, err := newGossipMempool( + baseMempool, + logging.NoLog{}, + testVerifier{ + err: errTest, // We shouldn't be attempting to verify the tx in this flow + }, + parser, + DefaultConfig.ExpectedBloomFilterElements, + DefaultConfig.ExpectedBloomFilterFalsePositiveProbability, + DefaultConfig.MaxBloomFilterFalsePositiveProbability, + ) + require.NoError(err) + + tx := &txs.Tx{ + Unsigned: &txs.BaseTx{ + BaseTx: avax.BaseTx{ + Ins: []*avax.TransferableInput{}, + }, + }, + TxID: ids.GenerateTestID(), + } + + require.NoError(mempool.AddVerified(tx)) + require.True(mempool.bloom.Has(tx)) +} diff --git a/vms/avm/network/network.go b/vms/avm/network/network.go index b29e80c98124..9057ba1df346 100644 --- a/vms/avm/network/network.go +++ b/vms/avm/network/network.go @@ -5,57 +5,43 @@ package network import ( "context" - "fmt" "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/avm/txs/mempool" "github.com/ava-labs/avalanchego/vms/components/message" ) -// We allow [recentTxsCacheSize] to be fairly large because we only store hashes -// in the cache, not entire transactions. -const recentTxsCacheSize = 512 - -var _ Network = (*network)(nil) - -type Network interface { - common.AppHandler - - // IssueTx attempts to add a tx to the mempool, after verifying it against - // the preferred state. If the tx is added to the mempool, it will attempt - // to push gossip the tx to random peers in the network. - // - // If the tx is already in the mempool, mempool.ErrDuplicateTx will be - // returned. - // If the tx is not added to the mempool, an error will be returned. - IssueTx(context.Context, *txs.Tx) error - - // IssueVerifiedTx attempts to add a tx to the mempool. If the tx is added - // to the mempool, it will attempt to push gossip the tx to random peers in - // the network. - // - // If the tx is already in the mempool, mempool.ErrDuplicateTx will be - // returned. - // If the tx is not added to the mempool, an error will be returned. - IssueVerifiedTx(context.Context, *txs.Tx) error -} +const txGossipHandlerID = 0 + +var ( + _ common.AppHandler = (*Network)(nil) + _ validators.Connector = (*Network)(nil) +) -type network struct { - // We embed a noop handler for all unhandled messages - common.AppHandler +type Network struct { + *p2p.Network - ctx *snow.Context - parser txs.Parser - txVerifier TxVerifier - mempool mempool.Mempool - appSender common.AppSender + txPushGossiper gossip.Accumulator[*txs.Tx] + txPullGossiper gossip.Gossiper + txPullGossipFrequency time.Duration + + ctx *snow.Context + parser txs.Parser + mempool *gossipMempool + appSender common.AppSender // gossip related attributes recentTxsLock sync.Mutex @@ -68,23 +54,124 @@ func New( txVerifier TxVerifier, mempool mempool.Mempool, appSender common.AppSender, -) Network { - return &network{ - AppHandler: common.NewNoOpAppHandler(ctx.Log), + registerer prometheus.Registerer, + config Config, +) (*Network, error) { + p2pNetwork, err := p2p.NewNetwork(ctx.Log, appSender, registerer, "p2p") + if err != nil { + return nil, err + } + + marshaller := &txParser{ + parser: parser, + } + validators := p2p.NewValidators( + p2pNetwork.Peers, + ctx.Log, + ctx.SubnetID, + ctx.ValidatorState, + config.MaxValidatorSetStaleness, + ) + txGossipClient := p2pNetwork.NewClient( + txGossipHandlerID, + p2p.WithValidatorSampling(validators), + ) + txGossipMetrics, err := gossip.NewMetrics(registerer, "tx") + if err != nil { + return nil, err + } + + txPushGossiper := gossip.NewPushGossiper[*txs.Tx]( + marshaller, + txGossipClient, + txGossipMetrics, + config.TargetGossipSize, + ) + + gossipMempool, err := newGossipMempool( + mempool, + ctx.Log, + txVerifier, + parser, + config.ExpectedBloomFilterElements, + config.ExpectedBloomFilterFalsePositiveProbability, + config.MaxBloomFilterFalsePositiveProbability, + ) + if err != nil { + return nil, err + } + + var txPullGossiper gossip.Gossiper + txPullGossiper = gossip.NewPullGossiper[*txs.Tx]( + ctx.Log, + marshaller, + gossipMempool, + txGossipClient, + txGossipMetrics, + config.PullGossipPollSize, + ) + + // Gossip requests are only served if a node is a validator + txPullGossiper = gossip.ValidatorGossiper{ + Gossiper: txPullGossiper, + NodeID: ctx.NodeID, + Validators: validators, + } + + handler := gossip.NewHandler[*txs.Tx]( + ctx.Log, + marshaller, + txPushGossiper, + gossipMempool, + txGossipMetrics, + config.TargetGossipSize, + ) + + validatorHandler := p2p.NewValidatorHandler( + p2p.NewThrottlerHandler( + handler, + p2p.NewSlidingWindowThrottler( + config.PullGossipThrottlingPeriod, + config.PullGossipThrottlingLimit, + ), + ctx.Log, + ), + validators, + ctx.Log, + ) + + // We allow pushing txs between all peers, but only serve gossip requests + // from validators + txGossipHandler := txGossipHandler{ + appGossipHandler: handler, + appRequestHandler: validatorHandler, + } + + if err := p2pNetwork.AddHandler(txGossipHandlerID, txGossipHandler); err != nil { + return nil, err + } - ctx: ctx, - parser: parser, - txVerifier: txVerifier, - mempool: mempool, - appSender: appSender, + return &Network{ + Network: p2pNetwork, + txPushGossiper: txPushGossiper, + txPullGossiper: txPullGossiper, + txPullGossipFrequency: config.PullGossipFrequency, + ctx: ctx, + parser: parser, + mempool: gossipMempool, + appSender: appSender, recentTxs: &cache.LRU[ids.ID, struct{}]{ - Size: recentTxsCacheSize, + Size: config.LegacyPushGossipCacheSize, }, - } + }, nil +} + +func (n *Network) Gossip(ctx context.Context) { + gossip.Every(ctx, n.ctx.Log, n.txPullGossiper, n.txPullGossipFrequency) } -func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []byte) error { +func (n *Network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []byte) error { n.ctx.Log.Debug("called AppGossip message handler", zap.Stringer("nodeID", nodeID), zap.Int("messageLen", len(msgBytes)), @@ -92,10 +179,11 @@ func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b msgIntf, err := message.Parse(msgBytes) if err != nil { - n.ctx.Log.Debug("dropping AppGossip message", + n.ctx.Log.Debug("forwarding AppGossip message to SDK network", zap.String("reason", "failed to parse message"), ) - return nil + + return n.Network.AppGossip(ctx, nodeID, msgBytes) } msg, ok := msgIntf.(*message.Tx) @@ -116,71 +204,59 @@ func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b return nil } - if err := n.issueTx(tx); err == nil { + err = n.mempool.Add(tx) + if err == nil { txID := tx.ID() + n.txPushGossiper.Add(tx) + if err := n.txPushGossiper.Gossip(ctx); err != nil { + n.ctx.Log.Error("failed to gossip tx", + zap.Stringer("txID", tx.ID()), + zap.Error(err), + ) + } n.gossipTxMessage(ctx, txID, msgBytes) } return nil } -func (n *network) IssueTx(ctx context.Context, tx *txs.Tx) error { - if err := n.issueTx(tx); err != nil { +// IssueTx attempts to add a tx to the mempool, after verifying it. If the tx is +// added to the mempool, it will attempt to push gossip the tx to random peers +// in the network using both the legacy and p2p SDK. +// +// If the tx is already in the mempool, mempool.ErrDuplicateTx will be +// returned. +// If the tx is not added to the mempool, an error will be returned. +func (n *Network) IssueTx(ctx context.Context, tx *txs.Tx) error { + if err := n.mempool.Add(tx); err != nil { return err } return n.gossipTx(ctx, tx) } -func (n *network) IssueVerifiedTx(ctx context.Context, tx *txs.Tx) error { - if err := n.issueVerifiedTx(tx); err != nil { +// IssueVerifiedTx attempts to add a tx to the mempool, without first verifying +// it. If the tx is added to the mempool, it will attempt to push gossip the tx +// to random peers in the network using both the legacy and p2p SDK. +// +// If the tx is already in the mempool, mempool.ErrDuplicateTx will be +// returned. +// If the tx is not added to the mempool, an error will be returned. +func (n *Network) IssueVerifiedTx(ctx context.Context, tx *txs.Tx) error { + if err := n.mempool.AddVerified(tx); err != nil { return err } return n.gossipTx(ctx, tx) } -func (n *network) issueTx(tx *txs.Tx) error { - txID := tx.ID() - if _, ok := n.mempool.Get(txID); ok { - return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID) - } - - if reason := n.mempool.GetDropReason(txID); reason != nil { - // If the tx is being dropped - just ignore it - // - // TODO: Should we allow re-verification of the transaction even if it - // failed previously? - return reason - } - - if err := n.txVerifier.VerifyTx(tx); err != nil { - n.ctx.Log.Debug("tx failed verification", - zap.Stringer("txID", txID), - zap.Error(err), - ) - - n.mempool.MarkDropped(txID, err) - return err - } - - return n.issueVerifiedTx(tx) -} - -func (n *network) issueVerifiedTx(tx *txs.Tx) error { - if err := n.mempool.Add(tx); err != nil { - txID := tx.ID() - n.ctx.Log.Debug("tx failed to be added to the mempool", - zap.Stringer("txID", txID), +// gossipTx pushes the tx to peers using both the legacy and p2p SDK. +func (n *Network) gossipTx(ctx context.Context, tx *txs.Tx) error { + n.txPushGossiper.Add(tx) + if err := n.txPushGossiper.Gossip(ctx); err != nil { + n.ctx.Log.Error("failed to gossip tx", + zap.Stringer("txID", tx.ID()), zap.Error(err), ) - - n.mempool.MarkDropped(txID, err) - return err } - n.mempool.RequestBuildBlock() - return nil -} - -func (n *network) gossipTx(ctx context.Context, tx *txs.Tx) error { txBytes := tx.Bytes() msg := &message.Tx{ Tx: txBytes, @@ -195,7 +271,9 @@ func (n *network) gossipTx(ctx context.Context, tx *txs.Tx) error { return nil } -func (n *network) gossipTxMessage(ctx context.Context, txID ids.ID, msgBytes []byte) { +// gossipTxMessage pushes the tx message to peers using the legacy format. +// If the tx was recently gossiped, this function does nothing. +func (n *Network) gossipTxMessage(ctx context.Context, txID ids.ID, msgBytes []byte) { n.recentTxsLock.Lock() _, has := n.recentTxs.Get(txID) n.recentTxs.Put(txID, struct{}{}) diff --git a/vms/avm/network/network_test.go b/vms/avm/network/network_test.go index 509b35ec0f13..4250a82b8bcb 100644 --- a/vms/avm/network/network_test.go +++ b/vms/avm/network/network_test.go @@ -7,6 +7,9 @@ import ( "context" "errors" "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -27,7 +30,22 @@ import ( "github.com/ava-labs/avalanchego/vms/secp256k1fx" ) -var errTest = errors.New("test error") +var ( + testConfig = Config{ + MaxValidatorSetStaleness: time.Second, + TargetGossipSize: 1, + PullGossipPollSize: 1, + PullGossipFrequency: time.Second, + PullGossipThrottlingPeriod: time.Second, + PullGossipThrottlingLimit: 1, + ExpectedBloomFilterElements: 10, + ExpectedBloomFilterFalsePositiveProbability: .1, + MaxBloomFilterFalsePositiveProbability: .5, + LegacyPushGossipCacheSize: 512, + } + + errTest = errors.New("test error") +) func TestNetworkAppGossip(t *testing.T) { testTx := &txs.Tx{ @@ -154,7 +172,7 @@ func TestNetworkAppGossip(t *testing.T) { }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) return appSender }, }, @@ -193,7 +211,7 @@ func TestNetworkAppGossip(t *testing.T) { appSenderFunc = tt.appSenderFunc } - n := New( + n, err := New( &snow.Context{ Log: logging.NoLog{}, }, @@ -201,7 +219,10 @@ func TestNetworkAppGossip(t *testing.T) { txVerifierFunc(ctrl), mempoolFunc(ctrl), appSenderFunc(ctrl), + prometheus.NewRegistry(), + testConfig, ) + require.NoError(err) require.NoError(n.AppGossip(context.Background(), ids.GenerateTestNodeID(), tt.msgBytesFunc())) }) } @@ -286,7 +307,7 @@ func TestNetworkIssueTx(t *testing.T) { }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) return appSender }, expectedErr: nil, @@ -326,7 +347,7 @@ func TestNetworkIssueTx(t *testing.T) { appSenderFunc = tt.appSenderFunc } - n := New( + n, err := New( &snow.Context{ Log: logging.NoLog{}, }, @@ -334,7 +355,10 @@ func TestNetworkIssueTx(t *testing.T) { txVerifierFunc(ctrl), mempoolFunc(ctrl), appSenderFunc(ctrl), + prometheus.NewRegistry(), + testConfig, ) + require.NoError(err) err = n.IssueTx(context.Background(), &txs.Tx{}) require.ErrorIs(err, tt.expectedErr) }) @@ -370,7 +394,7 @@ func TestNetworkIssueVerifiedTx(t *testing.T) { }, appSenderFunc: func(ctrl *gomock.Controller) common.AppSender { appSender := common.NewMockSender(ctrl) - appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil) + appSender.EXPECT().SendAppGossip(gomock.Any(), gomock.Any()).Return(nil).Times(2) return appSender }, expectedErr: nil, @@ -403,7 +427,7 @@ func TestNetworkIssueVerifiedTx(t *testing.T) { appSenderFunc = tt.appSenderFunc } - n := New( + n, err := New( &snow.Context{ Log: logging.NoLog{}, }, @@ -411,7 +435,10 @@ func TestNetworkIssueVerifiedTx(t *testing.T) { executor.NewMockManager(ctrl), // Should never verify a tx mempoolFunc(ctrl), appSenderFunc(ctrl), + prometheus.NewRegistry(), + testConfig, ) + require.NoError(err) err = n.IssueVerifiedTx(context.Background(), &txs.Tx{}) require.ErrorIs(err, tt.expectedErr) }) @@ -429,7 +456,7 @@ func TestNetworkGossipTx(t *testing.T) { appSender := common.NewMockSender(ctrl) - nIntf := New( + n, err := New( &snow.Context{ Log: logging.NoLog{}, }, @@ -437,9 +464,10 @@ func TestNetworkGossipTx(t *testing.T) { executor.NewMockManager(ctrl), mempool.NewMockMempool(ctrl), appSender, + prometheus.NewRegistry(), + testConfig, ) - require.IsType(&network{}, nIntf) - n := nIntf.(*network) + require.NoError(err) // Case: Tx was recently gossiped txID := ids.GenerateTestID() diff --git a/vms/avm/txs/tx.go b/vms/avm/txs/tx.go index 6025828a5dcb..7b900987692a 100644 --- a/vms/avm/txs/tx.go +++ b/vms/avm/txs/tx.go @@ -8,6 +8,7 @@ import ( "github.com/ava-labs/avalanchego/codec" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/hashing" @@ -19,6 +20,8 @@ import ( "github.com/ava-labs/avalanchego/vms/secp256k1fx" ) +var _ gossip.Gossipable = (*Tx)(nil) + type UnsignedTx interface { snow.ContextInitializable @@ -75,6 +78,11 @@ func (t *Tx) ID() ids.ID { return t.TxID } +// GossipID returns the unique ID that this tx should use for mempool gossip +func (t *Tx) GossipID() ids.ID { + return t.TxID +} + // Bytes returns the binary representation of this tx func (t *Tx) Bytes() []byte { return t.bytes diff --git a/vms/avm/vm.go b/vms/avm/vm.go index 93467ad877ef..699f95a6e7be 100644 --- a/vms/avm/vm.go +++ b/vms/avm/vm.go @@ -9,8 +9,7 @@ import ( "fmt" "net/http" "reflect" - - stdjson "encoding/json" + "sync" "github.com/gorilla/rpc/v2" @@ -83,6 +82,8 @@ type VM struct { registerer prometheus.Registerer + connectedPeers map[ids.NodeID]*version.Application + parser block.Parser pubsub *pubsub.Server @@ -113,19 +114,35 @@ type VM struct { txBackend *txexecutor.Backend + context context.Context + startShutdown context.CancelFunc + awaitShutdown sync.WaitGroup + + networkConfig network.Config // These values are only initialized after the chain has been linearized. - mempool mempool.Mempool blockbuilder.Builder chainManager blockexecutor.Manager - network network.Network + network *network.Network } -func (*VM) Connected(context.Context, ids.NodeID, *version.Application) error { - return nil +func (vm *VM) Connected(ctx context.Context, nodeID ids.NodeID, version *version.Application) error { + // If the chain isn't linearized yet, we must track the peers externally + // until the network is initialized. + if vm.network == nil { + vm.connectedPeers[nodeID] = version + return nil + } + return vm.network.Connected(ctx, nodeID, version) } -func (*VM) Disconnected(context.Context, ids.NodeID) error { - return nil +func (vm *VM) Disconnected(ctx context.Context, nodeID ids.NodeID) error { + // If the chain isn't linearized yet, we must track the peers externally + // until the network is initialized. + if vm.network == nil { + delete(vm.connectedPeers, nodeID) + return nil + } + return vm.network.Disconnected(ctx, nodeID) } /* @@ -134,12 +151,6 @@ func (*VM) Disconnected(context.Context, ids.NodeID) error { ****************************************************************************** */ -type Config struct { - IndexTransactions bool `json:"index-transactions"` - IndexAllowIncomplete bool `json:"index-allow-incomplete"` - ChecksumsEnabled bool `json:"checksums-enabled"` -} - func (vm *VM) Initialize( _ context.Context, ctx *snow.Context, @@ -154,15 +165,13 @@ func (vm *VM) Initialize( noopMessageHandler := common.NewNoOpAppHandler(ctx.Log) vm.Atomic = network.NewAtomic(noopMessageHandler) - avmConfig := Config{} - if len(configBytes) > 0 { - if err := stdjson.Unmarshal(configBytes, &avmConfig); err != nil { - return err - } - ctx.Log.Info("VM config initialized", - zap.Reflect("config", avmConfig), - ) + avmConfig, err := ParseConfig(configBytes) + if err != nil { + return err } + ctx.Log.Info("VM config initialized", + zap.Reflect("config", avmConfig), + ) registerer := prometheus.NewRegistry() if err := ctx.Metrics.Register(registerer); err != nil { @@ -170,8 +179,9 @@ func (vm *VM) Initialize( } vm.registerer = registerer + vm.connectedPeers = make(map[ids.NodeID]*version.Application) + // Initialize metrics as soon as possible - var err error vm.metrics, err = metrics.New("", registerer) if err != nil { return fmt.Errorf("failed to initialize metrics: %w", err) @@ -264,6 +274,8 @@ func (vm *VM) Initialize( Bootstrapped: false, } + vm.context, vm.startShutdown = context.WithCancel(context.Background()) + vm.networkConfig = avmConfig.Network return vm.state.Commit() } @@ -306,6 +318,9 @@ func (vm *VM) Shutdown(context.Context) error { return nil } + vm.startShutdown() + vm.awaitShutdown.Wait() + return utils.Err( vm.state.Close(), vm.baseDB.Close(), @@ -396,20 +411,20 @@ func (*VM) VerifyHeightIndex(context.Context) error { ****************************************************************************** */ -func (vm *VM) Linearize(_ context.Context, stopVertexID ids.ID, toEngine chan<- common.Message) error { +func (vm *VM) Linearize(ctx context.Context, stopVertexID ids.ID, toEngine chan<- common.Message) error { time := version.GetCortinaTime(vm.ctx.NetworkID) err := vm.state.InitializeChainState(stopVertexID, time) if err != nil { return err } - vm.mempool, err = mempool.New("mempool", vm.registerer, toEngine) + mempool, err := mempool.New("mempool", vm.registerer, toEngine) if err != nil { return fmt.Errorf("failed to create mempool: %w", err) } vm.chainManager = blockexecutor.NewManager( - vm.mempool, + mempool, vm.metrics, vm.state, vm.txBackend, @@ -421,26 +436,47 @@ func (vm *VM) Linearize(_ context.Context, stopVertexID ids.ID, toEngine chan<- vm.txBackend, vm.chainManager, &vm.clock, - vm.mempool, + mempool, ) // Invariant: The context lock is not held when calling network.IssueTx. - vm.network = network.New( + vm.network, err = network.New( vm.ctx, vm.parser, network.NewLockedTxVerifier( &vm.ctx.Lock, vm.chainManager, ), - vm.mempool, + mempool, vm.appSender, + vm.registerer, + vm.networkConfig, ) + if err != nil { + return fmt.Errorf("failed to initialize network: %w", err) + } + + // Notify the network of our current peers + for nodeID, version := range vm.connectedPeers { + if err := vm.network.Connected(ctx, nodeID, version); err != nil { + return err + } + } + vm.connectedPeers = nil // Note: It's important only to switch the networking stack after the full // chainVM has been initialized. Traffic will immediately start being // handled asynchronously. vm.Atomic.Set(vm.network) + vm.awaitShutdown.Add(1) + go func() { + defer vm.awaitShutdown.Done() + + // Invariant: Gossip must never grab the context lock. + vm.network.Gossip(vm.context) + }() + go func() { err := vm.state.Prune(&vm.ctx.Lock, vm.ctx.Log) if err != nil { diff --git a/vms/avm/vm_test.go b/vms/avm/vm_test.go index ae4a2f0c0af0..ce7c6c43ea61 100644 --- a/vms/avm/vm_test.go +++ b/vms/avm/vm_test.go @@ -35,10 +35,7 @@ func TestInvalidGenesis(t *testing.T) { vm := &VM{} ctx := snowtest.Context(t, snowtest.XChainID) ctx.Lock.Lock() - defer func() { - require.NoError(vm.Shutdown(context.Background())) - ctx.Lock.Unlock() - }() + defer ctx.Lock.Unlock() err := vm.Initialize( context.Background(),