Skip to content

Commit

Permalink
Merge branch 'dev' into e2e-add-bootstrap-check
Browse files Browse the repository at this point in the history
  • Loading branch information
marun authored Sep 5, 2023
2 parents e5b9a3c + 484a72f commit 862834f
Show file tree
Hide file tree
Showing 11 changed files with 948 additions and 40 deletions.
130 changes: 130 additions & 0 deletions network/p2p/gossip/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package gossip

import (
"crypto/rand"
"encoding/binary"
"hash"

bloomfilter "github.com/holiman/bloomfilter/v2"

"github.com/ava-labs/avalanchego/ids"
)

var _ hash.Hash64 = (*hasher)(nil)

// NewBloomFilter returns a new instance of a bloom filter with at most
// [maxExpectedElements] elements anticipated at any moment, and a false
// positive probability of [falsePositiveProbability].
func NewBloomFilter(
maxExpectedElements uint64,
falsePositiveProbability float64,
) (*BloomFilter, error) {
bloom, err := bloomfilter.NewOptimal(
maxExpectedElements,
falsePositiveProbability,
)
if err != nil {
return nil, err
}

salt, err := randomSalt()
return &BloomFilter{
Bloom: bloom,
Salt: salt,
}, err
}

type BloomFilter struct {
Bloom *bloomfilter.Filter
// Salt is provided to eventually unblock collisions in Bloom. It's possible
// that conflicting Gossipable items collide in the bloom filter, so a salt
// is generated to eventually resolve collisions.
Salt ids.ID
}

func (b *BloomFilter) Add(gossipable Gossipable) {
h := gossipable.GetID()
salted := &hasher{
hash: h[:],
salt: b.Salt,
}
b.Bloom.Add(salted)
}

func (b *BloomFilter) Has(gossipable Gossipable) bool {
h := gossipable.GetID()
salted := &hasher{
hash: h[:],
salt: b.Salt,
}
return b.Bloom.Contains(salted)
}

// ResetBloomFilterIfNeeded resets a bloom filter if it breaches a target false
// positive probability. Returns true if the bloom filter was reset.
func ResetBloomFilterIfNeeded(
bloomFilter *BloomFilter,
falsePositiveProbability float64,
) (bool, error) {
if bloomFilter.Bloom.FalsePosititveProbability() < falsePositiveProbability {
return false, nil
}

newBloom, err := bloomfilter.New(bloomFilter.Bloom.M(), bloomFilter.Bloom.K())
if err != nil {
return false, err
}
salt, err := randomSalt()
if err != nil {
return false, err
}

bloomFilter.Bloom = newBloom
bloomFilter.Salt = salt
return true, nil
}

func randomSalt() (ids.ID, error) {
salt := ids.ID{}
_, err := rand.Read(salt[:])
return salt, err
}

type hasher struct {
hash []byte
salt ids.ID
}

func (h *hasher) Write(p []byte) (n int, err error) {
h.hash = append(h.hash, p...)
return len(p), nil
}

func (h *hasher) Sum(b []byte) []byte {
h.hash = append(h.hash, b...)
return h.hash
}

func (h *hasher) Reset() {
h.hash = ids.Empty[:]
}

func (*hasher) BlockSize() int {
return ids.IDLen
}

func (h *hasher) Sum64() uint64 {
salted := ids.ID{}
for i := 0; i < len(h.hash) && i < ids.IDLen; i++ {
salted[i] = h.hash[i] ^ h.salt[i]
}

return binary.BigEndian.Uint64(salted[:])
}

func (h *hasher) Size() int {
return len(h.hash)
}
68 changes: 68 additions & 0 deletions network/p2p/gossip/bloom_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package gossip

import (
"testing"

bloomfilter "github.com/holiman/bloomfilter/v2"

"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
)

func TestBloomFilterRefresh(t *testing.T) {
tests := []struct {
name string
falsePositiveProbability float64
add []*testTx
expected []*testTx
}{
{
name: "no refresh",
falsePositiveProbability: 1,
add: []*testTx{
{id: ids.ID{0}},
},
expected: []*testTx{
{id: ids.ID{0}},
},
},
{
name: "refresh",
falsePositiveProbability: 0.1,
add: []*testTx{
{id: ids.ID{0}},
{id: ids.ID{1}},
},
expected: []*testTx{
{id: ids.ID{1}},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
b, err := bloomfilter.New(10, 1)
require.NoError(err)
bloom := BloomFilter{
Bloom: b,
}

for _, item := range tt.add {
_, err = ResetBloomFilterIfNeeded(&bloom, tt.falsePositiveProbability)
require.NoError(err)
bloom.Add(item)
}

require.Equal(uint64(len(tt.expected)), bloom.Bloom.N())

for _, expected := range tt.expected {
require.True(bloom.Has(expected))
}
})
}
}
141 changes: 141 additions & 0 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package gossip

import (
"context"
"time"

"go.uber.org/zap"

"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils/logging"
)

// GossipableAny exists to help create non-nil pointers to a concrete Gossipable
// ref: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go
type GossipableAny[T any] interface {
*T
Gossipable
}

type Config struct {
Frequency time.Duration
PollSize int
}

func NewGossiper[T any, U GossipableAny[T]](
config Config,
log logging.Logger,
set Set[U],
client *p2p.Client,
) *Gossiper[T, U] {
return &Gossiper[T, U]{
config: config,
log: log,
set: set,
client: client,
}
}

type Gossiper[T any, U GossipableAny[T]] struct {
config Config
log logging.Logger
set Set[U]
client *p2p.Client
}

func (g *Gossiper[_, _]) Gossip(ctx context.Context) {
gossipTicker := time.NewTicker(g.config.Frequency)
defer gossipTicker.Stop()

for {
select {
case <-gossipTicker.C:
if err := g.gossip(ctx); err != nil {
g.log.Warn("failed to gossip", zap.Error(err))
}
case <-ctx.Done():
g.log.Debug("shutting down gossip")
return
}
}
}

func (g *Gossiper[_, _]) gossip(ctx context.Context) error {
bloom, salt, err := g.set.GetFilter()
if err != nil {
return err
}

request := &sdk.PullGossipRequest{
Filter: bloom,
Salt: salt,
}
msgBytes, err := proto.Marshal(request)
if err != nil {
return err
}

for i := 0; i < g.config.PollSize; i++ {
if err := g.client.AppRequestAny(ctx, msgBytes, g.handleResponse); err != nil {
return err
}
}

return nil
}

func (g *Gossiper[T, U]) handleResponse(
nodeID ids.NodeID,
responseBytes []byte,
err error,
) {
if err != nil {
g.log.Debug(
"failed gossip request",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
)
return
}

response := &sdk.PullGossipResponse{}
if err := proto.Unmarshal(responseBytes, response); err != nil {
g.log.Debug("failed to unmarshal gossip response", zap.Error(err))
return
}

for _, bytes := range response.Gossip {
gossipable := U(new(T))
if err := gossipable.Unmarshal(bytes); err != nil {
g.log.Debug(
"failed to unmarshal gossip",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
)
continue
}

hash := gossipable.GetID()
g.log.Debug(
"received gossip",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", hash),
)
if err := g.set.Add(gossipable); err != nil {
g.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", hash),
zap.Error(err),
)
continue
}
}
}
Loading

0 comments on commit 862834f

Please sign in to comment.