Skip to content

Commit

Permalink
implement decaying tags. (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored May 14, 2020
1 parent 9367628 commit bce720e
Show file tree
Hide file tree
Showing 4 changed files with 679 additions and 26 deletions.
85 changes: 59 additions & 26 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,30 @@ var log = logging.Logger("connmgr")
//
// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
highWater int
lowWater int
connCount int32
gracePeriod time.Duration
segments segments
*decayer

cfg *BasicConnManagerConfig
segments segments

plk sync.RWMutex
protected map[peer.ID]map[string]struct{}

trimTrigger chan chan<- struct{}
// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
trimTrigger chan chan<- struct{}
connCount int32

lastTrimMu sync.RWMutex
lastTrim time.Time

silencePeriod time.Duration

ctx context.Context
cancel func()
}

var _ connmgr.ConnManager = (*BasicConnMgr)(nil)
var (
_ connmgr.ConnManager = (*BasicConnMgr)(nil)
_ connmgr.Decayer = (*BasicConnMgr)(nil)
)

type segment struct {
sync.Mutex
Expand Down Expand Up @@ -80,6 +83,7 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[p] = pi
Expand All @@ -92,15 +96,32 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
// their connections terminated) until 'low watermark' peers remain.
// * grace is the amount of time a newly opened connection is given before it becomes
// subject to pruning.
func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
func NewConnManager(low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr {
ctx, cancel := context.WithCancel(context.Background())
cm := &BasicConnMgr{

cfg := &BasicConnManagerConfig{
highWater: hi,
lowWater: low,
gracePeriod: grace,
silencePeriod: SilencePeriod,
}

for _, o := range opts {
// TODO we're ignoring errors from options because we have no way to
// return them, or otherwise act on them.
_ = o(cfg)
}

if cfg.decayer == nil {
// Set the default decayer config.
cfg.decayer = (&DecayerCfg{}).WithDefaults()
}

cm := &BasicConnMgr{
cfg: cfg,
trimRunningCh: make(chan struct{}, 1),
trimTrigger: make(chan chan<- struct{}),
protected: make(map[peer.ID]map[string]struct{}, 16),
silencePeriod: SilencePeriod,
ctx: ctx,
cancel: cancel,
segments: func() (ret segments) {
Expand All @@ -113,11 +134,17 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
}(),
}

decay, _ := NewDecayer(cfg.decayer, cm)
cm.decayer = decay

go cm.background()
return cm
}

func (cm *BasicConnMgr) Close() error {
if err := cm.decayer.Close(); err != nil {
return err
}
cm.cancel()
return nil
}
Expand Down Expand Up @@ -151,10 +178,12 @@ func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {

// peerInfo stores metadata for a given peer.
type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections
id peer.ID
tags map[string]int // value for each tag
decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags

value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections

conns map[network.Conn]time.Time // start time of each connection

Expand Down Expand Up @@ -199,7 +228,7 @@ func (cm *BasicConnMgr) background() {
var waiting chan<- struct{}
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) {
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
// Below high water, skip.
continue
}
Expand Down Expand Up @@ -235,7 +264,7 @@ func (cm *BasicConnMgr) trim() {
cm.lastTrimMu.RUnlock()

// skip this attempt to trim if the last one just took place.
if time.Since(lastTrim) < cm.silencePeriod {
if time.Since(lastTrim) < cm.cfg.silencePeriod {
return
}

Expand All @@ -256,21 +285,21 @@ func (cm *BasicConnMgr) trim() {
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
if cm.lowWater == 0 || cm.highWater == 0 {
if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
// disabled
return nil
}

nconns := int(atomic.LoadInt32(&cm.connCount))
if nconns <= cm.lowWater {
if nconns <= cm.cfg.lowWater {
log.Info("open connection count below limit")
return nil
}

npeers := cm.segments.countPeers()
candidates := make([]*peerInfo, 0, npeers)
ncandidates := 0
gracePeriodStart := time.Now().Add(-cm.gracePeriod)
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)

cm.plk.RLock()
for _, s := range cm.segments {
Expand All @@ -291,7 +320,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
}
cm.plk.RUnlock()

if ncandidates < cm.lowWater {
if ncandidates < cm.cfg.lowWater {
log.Info("open connection count above limit but too many are in the grace period")
// We have too many connections but fewer than lowWater
// connections out of the grace period.
Expand All @@ -311,7 +340,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
return left.value < right.value
})

target := ncandidates - cm.lowWater
target := ncandidates - cm.cfg.lowWater

// slightly overallocate because we may have more than one conns per peer
selected := make([]network.Conn, 0, target+10)
Expand Down Expand Up @@ -363,6 +392,9 @@ func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
for t, v := range pi.tags {
out.Tags[t] = v
}
for t, v := range pi.decaying {
out.Tags[t.name] = v.Value
}
for c, t := range pi.conns {
out.Conns[c.RemoteMultiaddr().String()] = t
}
Expand Down Expand Up @@ -439,10 +471,10 @@ func (cm *BasicConnMgr) GetInfo() CMInfo {
cm.lastTrimMu.RUnlock()

return CMInfo{
HighWater: cm.highWater,
LowWater: cm.lowWater,
HighWater: cm.cfg.highWater,
LowWater: cm.cfg.lowWater,
LastTrim: lastTrim,
GracePeriod: cm.gracePeriod,
GracePeriod: cm.cfg.gracePeriod,
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
}
}
Expand Down Expand Up @@ -478,6 +510,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
id: id,
firstSeen: time.Now(),
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[id] = pinfo
Expand Down
Loading

0 comments on commit bce720e

Please sign in to comment.