Skip to content
This repository has been archived by the owner on Apr 21, 2022. It is now read-only.

Add peer protection capability (implementation) #36

Merged
merged 3 commits into from
Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

const silencePeriod = 10 * time.Second
var SilencePeriod = 10 * time.Second

var log = logging.Logger("connmgr")

Expand All @@ -32,9 +32,13 @@ type BasicConnMgr struct {
gracePeriod time.Duration
peers map[peer.ID]*peerInfo

plk sync.RWMutex
protected map[peer.ID]struct{}
raulk marked this conversation as resolved.
Show resolved Hide resolved

// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
lastTrim time.Time
silencePeriod time.Duration
raulk marked this conversation as resolved.
Show resolved Hide resolved
}

var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil)
Expand All @@ -52,9 +56,23 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
gracePeriod: grace,
peers: make(map[peer.ID]*peerInfo),
trimRunningCh: make(chan struct{}, 1),
protected: make(map[peer.ID]struct{}, 16),
silencePeriod: SilencePeriod,
}
}

func (cm *BasicConnMgr) Protect(id peer.ID) {
cm.plk.Lock()
defer cm.plk.Unlock()
cm.protected[id] = struct{}{}
raulk marked this conversation as resolved.
Show resolved Hide resolved
}

func (cm *BasicConnMgr) Unprotect(id peer.ID) {
cm.plk.Lock()
defer cm.plk.Unlock()
delete(cm.protected, id)
}

// peerInfo stores metadata for a given peer.
type peerInfo struct {
tags map[string]int // value for each tag
Expand All @@ -79,7 +97,7 @@ func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
return
}
defer func() { <-cm.trimRunningCh }()
if time.Since(cm.lastTrim) < silencePeriod {
if time.Since(cm.lastTrim) < cm.silencePeriod {
// skip this attempt to trim as the last one just took place.
return
}
Expand Down Expand Up @@ -110,9 +128,15 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {

var infos []*peerInfo

for _, inf := range cm.peers {
cm.plk.RLock()
for id, inf := range cm.peers {
if _, ok := cm.protected[id]; ok {
// skip protected peer; it's not eligible for pruning.
continue
}
infos = append(infos, inf)
}
cm.plk.RUnlock()

// Sort peers according to their value.
sort.Slice(infos, func(i, j int) bool {
Expand Down
61 changes: 61 additions & 0 deletions connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,64 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total)
}
}

func TestPeerProtection(t *testing.T) {
SilencePeriod = 0
cm := NewConnManager(10, 20, 0)
SilencePeriod = 10 * time.Second

not := cm.Notifee()

// produce 20 connections with unique peers.
var conns []inet.Conn
for i := 0; i < 20; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// protect the first 5 peers.
var protected []inet.Conn
for _, c := range conns[0:5] {
cm.Protect(c.RemotePeer())
protected = append(protected, c)
// remove the tag to make them even more eligible for pruning.
cm.UntagPeer(c.RemotePeer(), "test")
}

// add one more connection, sending the connection manager overboard.
not.Connected(nil, randConn(t, not.Disconnected))

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

for _, c := range protected {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}

// unprotect the first peer.
cm.Unprotect(protected[0].RemotePeer())

// add 11 more connections, sending the connection manager overboard again.
for i := 0; i < 11; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

if !protected[0].(*tconn).closed {
t.Error("unprotected connection was kept open by connection manager")
}
for _, c := range protected[1:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}
}