Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p/discover: improve table addition code #18974

Merged
merged 2 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 80 additions & 35 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
for _, n := range r {
tab.add(n)
tab.addSeenNode(n)
}
reply <- r
}
Expand Down Expand Up @@ -443,7 +443,7 @@ func (tab *Table) loadSeedNodes() {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.add(seed)
tab.addSeenNode(seed)
}
}

Expand All @@ -468,7 +468,7 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
// The node responded, move it to the front.
last.livenessChecks++
log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks)
b.bump(last)
tab.bumpInBucket(b, last)
return
}
// No reply received, pick a replacement or delete the node if there aren't
Expand Down Expand Up @@ -551,36 +551,81 @@ func (tab *Table) bucket(id enode.ID) *bucket {
return tab.buckets[d-bucketMinDistance-1]
}

// add attempts to add the given node to its corresponding bucket. If the bucket has space
// available, adding the node succeeds immediately. Otherwise, the node is added if the
// least recently active node in the bucket does not respond to a ping packet.
// addSeenNode adds a node which may or may not be live to the end of a bucket. If the
// bucket has space available, adding the node succeeds immediately. Otherwise, the node is
// added to the replacements list.
//
// The caller must not hold tab.mutex.
func (tab *Table) add(n *node) {
func (tab *Table) addSeenNode(n *node) {
if n.ID() == tab.self().ID() {
return
}

tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
if !tab.bumpOrAdd(b, n) {
// Node is not in table. Add it to the replacement list.
if contains(b.entries, n.ID()) {
// Already in bucket, don't add.
return
}
if len(b.entries) >= bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
}
if !tab.addIP(b, n.IP()) {
// Can't add: IP limit reached.
return
}
// Add to end of bucket:
b.entries = append(b.entries, n)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
}

// addThroughPing adds the given node to the table. Compared to plain
// 'add' there is an additional safety measure: if the table is still
// initializing the node is not added. This prevents an attack where the
// table could be filled by just sending ping repeatedly.
// addLiveNode adds a node whose existence has been verified recently to the front of a
fjl marked this conversation as resolved.
Show resolved Hide resolved
// bucket. If the node is already in the bucket, it is moved to the front. If the bucket
// has no space, the node is added to the replacements list.
//
// There is an additional safety measure: if the table is still initializing the node
// is not added. This prevents an attack where the table could be filled by just sending
// ping repeatedly.
//
// The caller must not hold tab.mutex.
func (tab *Table) addThroughPing(n *node) {
func (tab *Table) addVerifiedNode(n *node) {
if !tab.isInitDone() {
return
}
tab.add(n)
if n.ID() == tab.self().ID() {
return
}

tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
if tab.bumpInBucket(b, n) {
// Already in bucket, moved to front.
return
}
if len(b.entries) >= bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
}
if !tab.addIP(b, n.IP()) {
// Can't add: IP limit reached.
return
}
// Add to front of bucket.
b.entries, _ = pushNode(b.entries, n, bucketSize)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
}

// delete removes an entry from the node table. It is used to evacuate dead nodes.
Expand Down Expand Up @@ -653,10 +698,19 @@ func (tab *Table) replace(b *bucket, last *node) *node {

// bump moves the given node to the front of the bucket entry list
fjl marked this conversation as resolved.
Show resolved Hide resolved
// if it is contained in that list.
func (b *bucket) bump(n *node) bool {
func (tab *Table) bumpInBucket(b *bucket, n *node) bool {
for i := range b.entries {
if b.entries[i].ID() == n.ID() {
// move it to the front
if !n.IP().Equal(b.entries[i].IP()) {
// Endpoint has changed, ensure that the new IP fits into table limits.
tab.removeIP(b, b.entries[i].IP())
if !tab.addIP(b, n.IP()) {
// It doesn't, put the previous one back.
tab.addIP(b, b.entries[i].IP())
return false
}
}
// Move it to the front.
copy(b.entries[1:], b.entries[:i])
b.entries[0] = n
return true
Expand All @@ -665,29 +719,20 @@ func (b *bucket) bump(n *node) bool {
return false
}

// bumpOrAdd moves n to the front of the bucket entry list or adds it if the list isn't
// full. The return value is true if n is in the bucket.
func (tab *Table) bumpOrAdd(b *bucket, n *node) bool {
if b.bump(n) {
return true
}
if len(b.entries) >= bucketSize || !tab.addIP(b, n.IP()) {
return false
}
b.entries, _ = pushNode(b.entries, n, bucketSize)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
return true
}

func (tab *Table) deleteInBucket(b *bucket, n *node) {
b.entries = deleteNode(b.entries, n)
tab.removeIP(b, n.IP())
}

func contains(ns []*node, id enode.ID) bool {
for _, n := range ns {
if n.ID() == id {
return true
}
}
return false
}

// pushNode adds n to the front of list, keeping at most max items.
func pushNode(list []*node, n *node, max int) ([]*node, *node) {
if len(list) < max {
Expand Down
96 changes: 91 additions & 5 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/netutil"
)

func TestTable_pingReplace(t *testing.T) {
Expand Down Expand Up @@ -64,7 +65,7 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding
// its bucket if it is unresponsive. Revalidate again to ensure that
transport.dead[last.ID()] = !lastInBucketIsResponding
transport.dead[pingSender.ID()] = !newNodeIsResponding
tab.add(pingSender)
tab.addSeenNode(pingSender)
tab.doRevalidate(make(chan struct{}, 1))
tab.doRevalidate(make(chan struct{}, 1))

Expand Down Expand Up @@ -114,10 +115,14 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
}

prop := func(nodes []*node, bumps []int) (ok bool) {
tab, db := newTestTable(newPingRecorder())
defer db.Close()
defer tab.Close()

b := &bucket{entries: make([]*node, len(nodes))}
copy(b.entries, nodes)
for i, pos := range bumps {
b.bump(b.entries[pos])
tab.bumpInBucket(b, b.entries[pos])
if hasDuplicates(b.entries) {
t.Logf("bucket has duplicates after %d/%d bumps:", i+1, len(bumps))
for _, n := range b.entries {
Expand All @@ -126,6 +131,7 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
return false
}
}
checkIPLimitInvariant(t, tab)
return true
}
if err := quick.Check(prop, cfg); err != nil {
Expand All @@ -142,11 +148,12 @@ func TestTable_IPLimit(t *testing.T) {

for i := 0; i < tableIPLimit+1; i++ {
n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)})
tab.add(n)
tab.addSeenNode(n)
}
if tab.len() > tableIPLimit {
t.Errorf("too many nodes in table")
}
checkIPLimitInvariant(t, tab)
}

// This checks that the per-bucket IP limit is applied correctly.
Expand All @@ -159,11 +166,28 @@ func TestTable_BucketIPLimit(t *testing.T) {
d := 3
for i := 0; i < bucketIPLimit+1; i++ {
n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)})
tab.add(n)
tab.addSeenNode(n)
}
if tab.len() > bucketIPLimit {
t.Errorf("too many nodes in table")
}
checkIPLimitInvariant(t, tab)
}

// checkIPLimitInvariant checks that ip limit sets contain an entry for every
// node in the table and no extra entries.
func checkIPLimitInvariant(t *testing.T, tab *Table) {
t.Helper()

tabset := netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}
for _, b := range tab.buckets {
for _, n := range b.entries {
tabset.Add(n.IP())
}
}
if tabset.String() != tab.ips.String() {
t.Errorf("table IP set is incorrect:\nhave: %v\nwant: %v", tab.ips, tabset)
}
}

func TestTable_closest(t *testing.T) {
Expand Down Expand Up @@ -281,6 +305,69 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
return reflect.ValueOf(t)
}

func TestTable_addVerifiedNode(t *testing.T) {
tab, db := newTestTable(newPingRecorder())
<-tab.initDone
defer db.Close()
defer tab.Close()

// Insert two nodes.
n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1})
n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
tab.addSeenNode(n1)
tab.addSeenNode(n2)

// Verify bucket content:
bcontent := []*node{n1, n2}
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries)
}

// Add a changed version of n2.
newrec := n2.Record()
newrec.Set(enr.IP{99, 99, 99, 99})
newn2 := wrapNode(enode.SignNull(newrec, n2.ID()))
tab.addVerifiedNode(newn2)

// Check that bucket is updated correctly.
newBcontent := []*node{newn2, n1}
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, newBcontent) {
t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries)
}
checkIPLimitInvariant(t, tab)
}

func TestTable_addSeenNode(t *testing.T) {
tab, db := newTestTable(newPingRecorder())
<-tab.initDone
defer db.Close()
defer tab.Close()

// Insert two nodes.
n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1})
n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
tab.addSeenNode(n1)
tab.addSeenNode(n2)

// Verify bucket content:
bcontent := []*node{n1, n2}
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries)
}

// Add a changed version of n2.
newrec := n2.Record()
newrec.Set(enr.IP{99, 99, 99, 99})
newn2 := wrapNode(enode.SignNull(newrec, n2.ID()))
tab.addSeenNode(newn2)

// Check that bucket content is unchanged.
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries)
}
checkIPLimitInvariant(t, tab)
}

func TestTable_Lookup(t *testing.T) {
tab, db := newTestTable(lookupTestnet)
defer db.Close()
Expand Down Expand Up @@ -535,7 +622,6 @@ func (tn *preminedTestnet) findnode(toid enode.ID, toaddr *net.UDPAddr, target e
}

func (*preminedTestnet) close() {}
func (*preminedTestnet) waitping(from enode.ID) error { return nil }
func (*preminedTestnet) ping(toid enode.ID, toaddr *net.UDPAddr) error { return nil }

// mine generates a testnet struct literal with nodes at
Expand Down
20 changes: 1 addition & 19 deletions p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,8 @@ func fillBucket(tab *Table, n *node) (last *node) {
// fillTable adds nodes the table to the end of their corresponding bucket
// if the bucket is not full. The caller must not hold tab.mutex.
func fillTable(tab *Table, nodes []*node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()

for _, n := range nodes {
if n.ID() == tab.self().ID() {
continue // don't add self
}
b := tab.bucket(n.ID())
if len(b.entries) < bucketSize {
tab.bumpOrAdd(b, n)
}
tab.addSeenNode(n)
}
}

Expand Down Expand Up @@ -154,15 +145,6 @@ func hasDuplicates(slice []*node) bool {
return false
}

func contains(ns []*node, id enode.ID) bool {
for _, n := range ns {
if n.ID() == id {
return true
}
}
return false
}

func sortedByDistanceTo(distbase enode.ID, slice []*node) bool {
var last enode.ID
for i, e := range slice {
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,10 +661,10 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte)
n := wrapNode(enode.NewV4(req.senderKey, from.IP, int(req.From.TCP), from.Port))
if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration {
t.sendPing(fromID, from, func() {
t.tab.addThroughPing(n)
t.tab.addVerifiedNode(n)
})
} else {
t.tab.addThroughPing(n)
t.tab.addVerifiedNode(n)
}

// Update node database and endpoint predictor.
Expand Down