Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

segment the memory peerstore + granular locks #78

Merged
merged 12 commits into from
May 18, 2019
125 changes: 85 additions & 40 deletions pstoremem/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sort"
"sync"
"sync/atomic"
"time"

logging "github.com/ipfs/go-log"
Expand All @@ -26,27 +27,46 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool {
return t.After(e.Expires)
}

var _ pstore.AddrBook = (*memoryAddrBook)(nil)
type segments [256]*segment

type segment struct {
size uint32

lk sync.RWMutex
addrs map[peer.ID]map[string]*expiringAddr
}

func (s *segments) get(id peer.ID) *segment {
b := []byte(id)
return s[b[len(b)-1]%byte(255)]
}

// memoryAddrBook manages addresses.
type memoryAddrBook struct {
addrmu sync.RWMutex
// Use pointers to save memory. Maps always leave some fraction of their
// space unused. storing the *values* directly in the map will
// drastically increase the space waste. In our case, by 6x.
addrs map[peer.ID]map[string]*expiringAddr
segments segments

ctx context.Context
cancel func()

subManager *AddrSubManager
}

var _ pstore.AddrBook = (*memoryAddrBook)(nil)

func NewAddrBook() pstore.AddrBook {
ctx, cancel := context.WithCancel(context.Background())

ab := &memoryAddrBook{
addrs: make(map[peer.ID]map[string]*expiringAddr),
segments: func() (ret segments) {
for i, _ := range ret {
ret[i] = &segment{addrs: make(map[peer.ID]map[string]*expiringAddr)}
}
return ret
}(),
subManager: NewAddrSubManager(),
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -79,29 +99,37 @@ func (mab *memoryAddrBook) Close() error {

// gc garbage collects the in-memory address book.
func (mab *memoryAddrBook) gc() {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()

now := time.Now()
for p, amap := range mab.addrs {
for k, addr := range amap {
if addr.ExpiredBy(now) {
delete(amap, k)
for _, s := range mab.segments {
s.lk.Lock()
for p, amap := range s.addrs {
for k, addr := range amap {
if addr.ExpiredBy(now) {
delete(amap, k)
}
}
if len(amap) == 0 {
delete(s.addrs, p)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}
}
if len(amap) == 0 {
delete(mab.addrs, p)
}
s.lk.Unlock()
}

}

func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice {
mab.addrmu.RLock()
defer mab.addrmu.RUnlock()
var length uint32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimizing for this case probably isn't worth it. This function isn't likely to be called frequently so we can probably just allocate the peer ID slice as needed and drop the atomic length updating logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

for _, s := range mab.segments {
length += atomic.LoadUint32(&s.size)
}

pids := make(peer.IDSlice, 0, len(mab.addrs))
for pid := range mab.addrs {
pids = append(pids, pid)
pids := make(peer.IDSlice, 0, length)
for _, s := range mab.segments {
s.lk.RLock()
for pid, _ := range s.addrs {
pids = append(pids, pid)
}
s.lk.RUnlock()
}
return pids
}
Expand All @@ -115,18 +143,22 @@ func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati
// (time-to-live), after which the address is no longer valid.
// If the manager has a longer TTL, the operation is a no-op for that address
func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()

// if ttl is zero, exit. nothing to do.
if ttl <= 0 {
return
}

amap := mab.addrs[p]
s := mab.segments.get(p)
s.lk.Lock()
defer s.lk.Unlock()

// update the segment size
defer atomic.StoreUint32(&s.size, uint32(len(s.addrs)))

amap := s.addrs[p]
if amap == nil {
amap = make(map[string]*expiringAddr, len(addrs))
mab.addrs[p] = amap
s.addrs[p] = amap
}
exp := time.Now().Add(ttl)
for _, addr := range addrs {
Expand All @@ -152,13 +184,17 @@ func (mab *memoryAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
// This is used when we receive the best estimate of the validity of an address.
func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()
s := mab.segments.get(p)
s.lk.Lock()
defer s.lk.Unlock()

// update the segment size
defer atomic.StoreUint32(&s.size, uint32(len(s.addrs)))

amap := mab.addrs[p]
amap := s.addrs[p]
if amap == nil {
amap = make(map[string]*expiringAddr, len(addrs))
mab.addrs[p] = amap
s.addrs[p] = amap
}

exp := time.Now().Add(ttl)
Expand All @@ -172,7 +208,6 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du

if ttl > 0 {
amap[addrstr] = &expiringAddr{Addr: addr, Expires: exp, TTL: ttl}

mab.subManager.BroadcastAddr(p, addr)
} else {
delete(amap, addrstr)
Expand All @@ -183,10 +218,14 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
// UpdateAddrs updates the addresses associated with the given peer that have
// the given oldTTL to have the given newTTL.
func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()
s := mab.segments.get(p)
s.lk.Lock()
defer s.lk.Unlock()

// update the segment size
defer atomic.StoreUint32(&s.size, uint32(len(s.addrs)))

amap, found := mab.addrs[p]
amap, found := s.addrs[p]
if !found {
return
}
Expand All @@ -203,10 +242,11 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t

// Addresses returns all known (and valid) addresses for a given
func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
mab.addrmu.RLock()
defer mab.addrmu.RUnlock()
s := mab.segments.get(p)
s.lk.RLock()
defer s.lk.RUnlock()

amap, found := mab.addrs[p]
amap, found := s.addrs[p]
if !found {
return nil
}
Expand All @@ -224,19 +264,24 @@ func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr {

// ClearAddrs removes all previously stored addresses
func (mab *memoryAddrBook) ClearAddrs(p peer.ID) {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()
s := mab.segments.get(p)
s.lk.Lock()
defer s.lk.Unlock()

// update the segment size
defer atomic.StoreUint32(&s.size, uint32(len(s.addrs)))

delete(mab.addrs, p)
delete(s.addrs, p)
}

// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
mab.addrmu.RLock()
defer mab.addrmu.RUnlock()
s := mab.segments.get(p)
s.lk.RLock()
defer s.lk.RUnlock()

baseaddrslice := mab.addrs[p]
baseaddrslice := s.addrs[p]
initial := make([]ma.Multiaddr, 0, len(baseaddrslice))
for _, a := range baseaddrslice {
initial = append(initial, a.Addr)
Expand Down