From e7e511ba9994e25c42584eebcff85138c9dea52f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 9 May 2019 00:27:58 +0100 Subject: [PATCH 01/12] segment the memory peerstore; granular locks. --- pstoremem/addr_book.go | 125 ++++++++++++++++++++++++++++------------- 1 file changed, 85 insertions(+), 40 deletions(-) diff --git a/pstoremem/addr_book.go b/pstoremem/addr_book.go index cb9576a..7aa0c3f 100644 --- a/pstoremem/addr_book.go +++ b/pstoremem/addr_book.go @@ -4,6 +4,7 @@ import ( "context" "sort" "sync" + "sync/atomic" "time" logging "github.com/ipfs/go-log" @@ -26,7 +27,19 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool { return t.After(e.Expires) } -var _ pstore.AddrBook = (*memoryAddrBook)(nil) +type segments [256]*segment + +type segment struct { + size uint32 + + lk sync.RWMutex + addrs map[peer.ID]map[string]*expiringAddr +} + +func (s *segments) get(id peer.ID) *segment { + b := []byte(id) + return s[b[len(b)-1]%byte(255)] +} // memoryAddrBook manages addresses. type memoryAddrBook struct { @@ -34,7 +47,7 @@ type memoryAddrBook struct { // Use pointers to save memory. Maps always leave some fraction of their // space unused. storing the *values* directly in the map will // drastically increase the space waste. In our case, by 6x. - addrs map[peer.ID]map[string]*expiringAddr + segments segments ctx context.Context cancel func() @@ -42,11 +55,18 @@ type memoryAddrBook struct { subManager *AddrSubManager } +var _ pstore.AddrBook = (*memoryAddrBook)(nil) + func NewAddrBook() pstore.AddrBook { ctx, cancel := context.WithCancel(context.Background()) ab := &memoryAddrBook{ - addrs: make(map[peer.ID]map[string]*expiringAddr), + segments: func() (ret segments) { + for i, _ := range ret { + ret[i] = &segment{addrs: make(map[peer.ID]map[string]*expiringAddr)} + } + return ret + }(), subManager: NewAddrSubManager(), ctx: ctx, cancel: cancel, @@ -79,29 +99,37 @@ func (mab *memoryAddrBook) Close() error { // gc garbage collects the in-memory address book. func (mab *memoryAddrBook) gc() { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() - now := time.Now() - for p, amap := range mab.addrs { - for k, addr := range amap { - if addr.ExpiredBy(now) { - delete(amap, k) + for _, s := range mab.segments { + s.lk.Lock() + for p, amap := range s.addrs { + for k, addr := range amap { + if addr.ExpiredBy(now) { + delete(amap, k) + } + } + if len(amap) == 0 { + delete(s.addrs, p) } } - if len(amap) == 0 { - delete(mab.addrs, p) - } + s.lk.Unlock() } + } func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { - mab.addrmu.RLock() - defer mab.addrmu.RUnlock() + var length uint32 + for _, s := range mab.segments { + length += atomic.LoadUint32(&s.size) + } - pids := make(peer.IDSlice, 0, len(mab.addrs)) - for pid := range mab.addrs { - pids = append(pids, pid) + pids := make(peer.IDSlice, 0, length) + for _, s := range mab.segments { + s.lk.RLock() + for pid, _ := range s.addrs { + pids = append(pids, pid) + } + s.lk.RUnlock() } return pids } @@ -115,18 +143,22 @@ func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati // (time-to-live), after which the address is no longer valid. // If the manager has a longer TTL, the operation is a no-op for that address func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() - // if ttl is zero, exit. nothing to do. if ttl <= 0 { return } - amap := mab.addrs[p] + s := mab.segments.get(p) + s.lk.Lock() + defer s.lk.Unlock() + + // update the segment size + defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) + + amap := s.addrs[p] if amap == nil { amap = make(map[string]*expiringAddr, len(addrs)) - mab.addrs[p] = amap + s.addrs[p] = amap } exp := time.Now().Add(ttl) for _, addr := range addrs { @@ -152,13 +184,17 @@ func (mab *memoryAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati // SetAddrs sets the ttl on addresses. This clears any TTL there previously. // This is used when we receive the best estimate of the validity of an address. func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() + s := mab.segments.get(p) + s.lk.Lock() + defer s.lk.Unlock() + + // update the segment size + defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) - amap := mab.addrs[p] + amap := s.addrs[p] if amap == nil { amap = make(map[string]*expiringAddr, len(addrs)) - mab.addrs[p] = amap + s.addrs[p] = amap } exp := time.Now().Add(ttl) @@ -172,7 +208,6 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du if ttl > 0 { amap[addrstr] = &expiringAddr{Addr: addr, Expires: exp, TTL: ttl} - mab.subManager.BroadcastAddr(p, addr) } else { delete(amap, addrstr) @@ -183,10 +218,14 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du // UpdateAddrs updates the addresses associated with the given peer that have // the given oldTTL to have the given newTTL. func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() + s := mab.segments.get(p) + s.lk.Lock() + defer s.lk.Unlock() + + // update the segment size + defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) - amap, found := mab.addrs[p] + amap, found := s.addrs[p] if !found { return } @@ -203,10 +242,11 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t // Addresses returns all known (and valid) addresses for a given func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { - mab.addrmu.RLock() - defer mab.addrmu.RUnlock() + s := mab.segments.get(p) + s.lk.RLock() + defer s.lk.RUnlock() - amap, found := mab.addrs[p] + amap, found := s.addrs[p] if !found { return nil } @@ -224,19 +264,24 @@ func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { // ClearAddrs removes all previously stored addresses func (mab *memoryAddrBook) ClearAddrs(p peer.ID) { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() + s := mab.segments.get(p) + s.lk.Lock() + defer s.lk.Unlock() + + // update the segment size + defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) - delete(mab.addrs, p) + delete(s.addrs, p) } // AddrStream returns a channel on which all new addresses discovered for a // given peer ID will be published. func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { - mab.addrmu.RLock() - defer mab.addrmu.RUnlock() + s := mab.segments.get(p) + s.lk.RLock() + defer s.lk.RUnlock() - baseaddrslice := mab.addrs[p] + baseaddrslice := s.addrs[p] initial := make([]ma.Multiaddr, 0, len(baseaddrslice)) for _, a := range baseaddrslice { initial = append(initial, a.Addr) From 303b1b6207886cf69c7a288bf51b1d731ff83a4e Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 9 May 2019 11:20:12 +0300 Subject: [PATCH 02/12] update segment size on gc --- pstoremem/addr_book.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pstoremem/addr_book.go b/pstoremem/addr_book.go index 7aa0c3f..cd7a920 100644 --- a/pstoremem/addr_book.go +++ b/pstoremem/addr_book.go @@ -112,6 +112,7 @@ func (mab *memoryAddrBook) gc() { delete(s.addrs, p) } } + atomic.StoreUint32(&s.size, uint32(len(s.addrs))) s.lk.Unlock() } From ea23986270ada129dd0a1ee6f51eb0fd8bef02a7 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 9 May 2019 12:14:03 +0300 Subject: [PATCH 03/12] segmented protocol info --- peerstore.go | 145 ++++++++++++++++++++++++++------------------------- 1 file changed, 74 insertions(+), 71 deletions(-) diff --git a/peerstore.go b/peerstore.go index f7f87b8..fde9cd4 100644 --- a/peerstore.go +++ b/peerstore.go @@ -10,8 +10,38 @@ import ( var _ Peerstore = (*peerstore)(nil) -const maxInternedProtocols = 512 -const maxInternedProtocolSize = 256 +const maxInternedProtocols = 64 +const maxInternedProtocolSize = 128 + +type segment struct { + lk sync.RWMutex + interned map[string]string + protocols map[peer.ID]map[string]struct{} +} + +type segments [256]*segment + +func (s *segments) get(id peer.ID) *segment { + b := []byte(id) + return s[b[len(b)-1]] +} + +func (s *segment) internProtocol(proto string) string { + if len(proto) > maxInternedProtocolSize { + return proto + } + + if interned, ok := s.interned[proto]; ok { + return interned + } + + if len(s.interned) >= maxInternedProtocols { + s.interned = make(map[string]string, maxInternedProtocols) + } + + s.interned[proto] = proto + return proto +} type peerstore struct { Metrics @@ -20,20 +50,27 @@ type peerstore struct { AddrBook PeerMetadata - // lock for protocol information, separate from datastore lock - protolock sync.RWMutex - internedProtocols map[string]string + // segments for protocol information + segments segments } // NewPeerstore creates a data structure that stores peer data, backed by the // supplied implementations of KeyBook, AddrBook and PeerMetadata. func NewPeerstore(kb KeyBook, ab AddrBook, md PeerMetadata) Peerstore { return &peerstore{ - KeyBook: kb, - AddrBook: ab, - PeerMetadata: md, - Metrics: NewMetrics(), - internedProtocols: make(map[string]string), + KeyBook: kb, + AddrBook: ab, + PeerMetadata: md, + Metrics: NewMetrics(), + segments: func() (ret segments) { + for i := range ret { + ret[i] = &segment{ + interned: make(map[string]string), + protocols: make(map[peer.ID]map[string]struct{}), + } + } + return ret + }(), } } @@ -80,77 +117,46 @@ func (ps *peerstore) PeerInfo(p peer.ID) PeerInfo { } } -func (ps *peerstore) internProtocol(s string) string { - if len(s) > maxInternedProtocolSize { - return s - } - - if interned, ok := ps.internedProtocols[s]; ok { - return interned - } - - if len(ps.internedProtocols) >= maxInternedProtocols { - ps.internedProtocols = make(map[string]string, maxInternedProtocols) - } - - ps.internedProtocols[s] = s - return s -} - func (ps *peerstore) SetProtocols(p peer.ID, protos ...string) error { - ps.protolock.Lock() - defer ps.protolock.Unlock() + s := ps.segments.get(p) + s.lk.Lock() + defer s.lk.Unlock() - protomap := make(map[string]struct{}, len(protos)) + newprotos := make(map[string]struct{}, len(protos)) for _, proto := range protos { - protomap[ps.internProtocol(proto)] = struct{}{} + newprotos[s.internProtocol(proto)] = struct{}{} } - return ps.Put(p, "protocols", protomap) + s.protocols[p] = newprotos + + return nil } func (ps *peerstore) AddProtocols(p peer.ID, protos ...string) error { - ps.protolock.Lock() - defer ps.protolock.Unlock() - protomap, err := ps.getProtocolMap(p) - if err != nil { - return err + s := ps.segments.get(p) + s.lk.Lock() + defer s.lk.Unlock() + + protomap, ok := s.protocols[p] + if !ok { + protomap = make(map[string]struct{}) + s.protocols[p] = protomap } for _, proto := range protos { - protomap[ps.internProtocol(proto)] = struct{}{} + protomap[s.internProtocol(proto)] = struct{}{} } - return ps.Put(p, "protocols", protomap) -} - -func (ps *peerstore) getProtocolMap(p peer.ID) (map[string]struct{}, error) { - iprotomap, err := ps.Get(p, "protocols") - switch err { - default: - return nil, err - case ErrNotFound: - return make(map[string]struct{}), nil - case nil: - cast, ok := iprotomap.(map[string]struct{}) - if !ok { - return nil, fmt.Errorf("stored protocol set was not a map") - } - - return cast, nil - } + return nil } func (ps *peerstore) GetProtocols(p peer.ID) ([]string, error) { - ps.protolock.RLock() - defer ps.protolock.RUnlock() - pmap, err := ps.getProtocolMap(p) - if err != nil { - return nil, err - } + s := ps.segments.get(p) + s.lk.RLock() + defer s.lk.RUnlock() - out := make([]string, 0, len(pmap)) - for k := range pmap { + out := make([]string, 0, len(s.protocols)) + for k := range s.protocols[p] { out = append(out, k) } @@ -158,16 +164,13 @@ func (ps *peerstore) GetProtocols(p peer.ID) ([]string, error) { } func (ps *peerstore) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { - ps.protolock.RLock() - defer ps.protolock.RUnlock() - pmap, err := ps.getProtocolMap(p) - if err != nil { - return nil, err - } + s := ps.segments.get(p) + s.lk.RLock() + defer s.lk.RUnlock() out := make([]string, 0, len(protos)) for _, proto := range protos { - if _, ok := pmap[proto]; ok { + if _, ok := s.protocols[p][proto]; ok { out = append(out, proto) } } From 08caa87351233894e20a410dcd12fa73fabd562e Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 17 May 2019 12:33:33 +0300 Subject: [PATCH 04/12] refactor protocol functionality into ProtoBook --- interface.go | 14 +++-- peerstore.go | 112 ++----------------------------------- pstoremem/addr_book.go | 14 ++--- pstoremem/peerstore.go | 1 + pstoremem/protobook.go | 123 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 144 insertions(+), 120 deletions(-) create mode 100644 pstoremem/protobook.go diff --git a/interface.go b/interface.go index c669ba9..e3d988c 100644 --- a/interface.go +++ b/interface.go @@ -56,17 +56,13 @@ type Peerstore interface { KeyBook PeerMetadata Metrics + ProtoBook // PeerInfo returns a peer.PeerInfo struct for given peer.ID. // This is a small slice of the information Peerstore has on // that peer, useful to other services. PeerInfo(peer.ID) PeerInfo - GetProtocols(peer.ID) ([]string, error) - AddProtocols(peer.ID, ...string) error - SetProtocols(peer.ID, ...string) error - SupportsProtocols(peer.ID, ...string) ([]string, error) - // Peers returns all of the peer IDs stored across all inner stores. Peers() peer.IDSlice } @@ -142,3 +138,11 @@ type KeyBook interface { // PeersWithKeys returns all the peer IDs stored in the KeyBook PeersWithKeys() peer.IDSlice } + +// ProtoBook tracks the protocols supported by peers +type ProtoBook interface { + GetProtocols(peer.ID) ([]string, error) + AddProtocols(peer.ID, ...string) error + SetProtocols(peer.ID, ...string) error + SupportsProtocols(peer.ID, ...string) ([]string, error) +} diff --git a/peerstore.go b/peerstore.go index fde9cd4..ea29a17 100644 --- a/peerstore.go +++ b/peerstore.go @@ -3,74 +3,30 @@ package peerstore import ( "fmt" "io" - "sync" peer "github.com/libp2p/go-libp2p-peer" ) var _ Peerstore = (*peerstore)(nil) -const maxInternedProtocols = 64 -const maxInternedProtocolSize = 128 - -type segment struct { - lk sync.RWMutex - interned map[string]string - protocols map[peer.ID]map[string]struct{} -} - -type segments [256]*segment - -func (s *segments) get(id peer.ID) *segment { - b := []byte(id) - return s[b[len(b)-1]] -} - -func (s *segment) internProtocol(proto string) string { - if len(proto) > maxInternedProtocolSize { - return proto - } - - if interned, ok := s.interned[proto]; ok { - return interned - } - - if len(s.interned) >= maxInternedProtocols { - s.interned = make(map[string]string, maxInternedProtocols) - } - - s.interned[proto] = proto - return proto -} - type peerstore struct { Metrics KeyBook AddrBook + ProtoBook PeerMetadata - - // segments for protocol information - segments segments } // NewPeerstore creates a data structure that stores peer data, backed by the // supplied implementations of KeyBook, AddrBook and PeerMetadata. -func NewPeerstore(kb KeyBook, ab AddrBook, md PeerMetadata) Peerstore { +func NewPeerstore(kb KeyBook, ab AddrBook, pb ProtoBook, md PeerMetadata) Peerstore { return &peerstore{ KeyBook: kb, AddrBook: ab, + ProtoBook: pb, PeerMetadata: md, Metrics: NewMetrics(), - segments: func() (ret segments) { - for i := range ret { - ret[i] = &segment{ - interned: make(map[string]string), - protocols: make(map[peer.ID]map[string]struct{}), - } - } - return ret - }(), } } @@ -86,6 +42,7 @@ func (ps *peerstore) Close() (err error) { weakClose("keybook", ps.KeyBook) weakClose("addressbook", ps.AddrBook) + weakClose("protobook", ps.ProtoBook) weakClose("peermetadata", ps.PeerMetadata) if len(errs) > 0 { @@ -117,67 +74,6 @@ func (ps *peerstore) PeerInfo(p peer.ID) PeerInfo { } } -func (ps *peerstore) SetProtocols(p peer.ID, protos ...string) error { - s := ps.segments.get(p) - s.lk.Lock() - defer s.lk.Unlock() - - newprotos := make(map[string]struct{}, len(protos)) - for _, proto := range protos { - newprotos[s.internProtocol(proto)] = struct{}{} - } - - s.protocols[p] = newprotos - - return nil -} - -func (ps *peerstore) AddProtocols(p peer.ID, protos ...string) error { - s := ps.segments.get(p) - s.lk.Lock() - defer s.lk.Unlock() - - protomap, ok := s.protocols[p] - if !ok { - protomap = make(map[string]struct{}) - s.protocols[p] = protomap - } - - for _, proto := range protos { - protomap[s.internProtocol(proto)] = struct{}{} - } - - return nil -} - -func (ps *peerstore) GetProtocols(p peer.ID) ([]string, error) { - s := ps.segments.get(p) - s.lk.RLock() - defer s.lk.RUnlock() - - out := make([]string, 0, len(s.protocols)) - for k := range s.protocols[p] { - out = append(out, k) - } - - return out, nil -} - -func (ps *peerstore) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { - s := ps.segments.get(p) - s.lk.RLock() - defer s.lk.RUnlock() - - out := make([]string, 0, len(protos)) - for _, proto := range protos { - if _, ok := s.protocols[p][proto]; ok { - out = append(out, proto) - } - } - - return out, nil -} - func PeerInfos(ps Peerstore, peers peer.IDSlice) []PeerInfo { pi := make([]PeerInfo, len(peers)) for i, p := range peers { diff --git a/pstoremem/addr_book.go b/pstoremem/addr_book.go index cd7a920..6baee9e 100644 --- a/pstoremem/addr_book.go +++ b/pstoremem/addr_book.go @@ -27,18 +27,18 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool { return t.After(e.Expires) } -type segments [256]*segment +type addrSegments [256]*addrSegment -type segment struct { +type addrSegment struct { size uint32 lk sync.RWMutex addrs map[peer.ID]map[string]*expiringAddr } -func (s *segments) get(id peer.ID) *segment { +func (s *addrSegments) get(id peer.ID) *addrSegment { b := []byte(id) - return s[b[len(b)-1]%byte(255)] + return s[b[len(b)-1]] } // memoryAddrBook manages addresses. @@ -47,7 +47,7 @@ type memoryAddrBook struct { // Use pointers to save memory. Maps always leave some fraction of their // space unused. storing the *values* directly in the map will // drastically increase the space waste. In our case, by 6x. - segments segments + segments addrSegments ctx context.Context cancel func() @@ -61,9 +61,9 @@ func NewAddrBook() pstore.AddrBook { ctx, cancel := context.WithCancel(context.Background()) ab := &memoryAddrBook{ - segments: func() (ret segments) { + segments: func() (ret addrSegments) { for i, _ := range ret { - ret[i] = &segment{addrs: make(map[peer.ID]map[string]*expiringAddr)} + ret[i] = &addrSegment{addrs: make(map[peer.ID]map[string]*expiringAddr)} } return ret }(), diff --git a/pstoremem/peerstore.go b/pstoremem/peerstore.go index 7d87313..c7cbd67 100644 --- a/pstoremem/peerstore.go +++ b/pstoremem/peerstore.go @@ -7,5 +7,6 @@ func NewPeerstore() pstore.Peerstore { return pstore.NewPeerstore( NewKeyBook(), NewAddrBook(), + NewProtoBook(), NewPeerMetadata()) } diff --git a/pstoremem/protobook.go b/pstoremem/protobook.go new file mode 100644 index 0000000..9074696 --- /dev/null +++ b/pstoremem/protobook.go @@ -0,0 +1,123 @@ +package pstoremem + +import ( + "sync" + + peer "github.com/libp2p/go-libp2p-peer" + + pstore "github.com/libp2p/go-libp2p-peerstore" +) + +const maxInternedProtocols = 64 +const maxInternedProtocolSize = 128 + +type protoSegment struct { + lk sync.RWMutex + interned map[string]string + protocols map[peer.ID]map[string]struct{} +} + +type protoSegments [256]*protoSegment + +func (s *protoSegments) get(id peer.ID) *protoSegment { + b := []byte(id) + return s[b[len(b)-1]] +} + +func (s *protoSegment) internProtocol(proto string) string { + if len(proto) > maxInternedProtocolSize { + return proto + } + + if interned, ok := s.interned[proto]; ok { + return interned + } + + if len(s.interned) >= maxInternedProtocols { + s.interned = make(map[string]string, maxInternedProtocols) + } + + s.interned[proto] = proto + return proto +} + +type memoryProtoBook struct { + segments protoSegments +} + +var _ pstore.ProtoBook = (*memoryProtoBook)(nil) + +func NewProtoBook() pstore.ProtoBook { + return &memoryProtoBook{ + segments: func() (ret protoSegments) { + for i := range ret { + ret[i] = &protoSegment{ + interned: make(map[string]string), + protocols: make(map[peer.ID]map[string]struct{}), + } + } + return ret + }(), + } +} + +func (pb *memoryProtoBook) SetProtocols(p peer.ID, protos ...string) error { + s := pb.segments.get(p) + s.lk.Lock() + defer s.lk.Unlock() + + newprotos := make(map[string]struct{}, len(protos)) + for _, proto := range protos { + newprotos[s.internProtocol(proto)] = struct{}{} + } + + s.protocols[p] = newprotos + + return nil +} + +func (pb *memoryProtoBook) AddProtocols(p peer.ID, protos ...string) error { + s := pb.segments.get(p) + s.lk.Lock() + defer s.lk.Unlock() + + protomap, ok := s.protocols[p] + if !ok { + protomap = make(map[string]struct{}) + s.protocols[p] = protomap + } + + for _, proto := range protos { + protomap[s.internProtocol(proto)] = struct{}{} + } + + return nil +} + +func (pb *memoryProtoBook) GetProtocols(p peer.ID) ([]string, error) { + s := pb.segments.get(p) + s.lk.RLock() + defer s.lk.RUnlock() + + out := make([]string, 0, len(s.protocols)) + for k := range s.protocols[p] { + out = append(out, k) + } + + return out, nil +} + +func (pb *memoryProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { + s := pb.segments.get(p) + s.lk.RLock() + defer s.lk.RUnlock() + + out := make([]string, 0, len(protos)) + for _, proto := range protos { + if _, ok := s.protocols[p][proto]; ok { + out = append(out, proto) + } + } + + return out, nil +} From 5ed17f0b79a01d819635273361357eddfdb50afa Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 17 May 2019 12:37:46 +0300 Subject: [PATCH 05/12] embed lock in segment --- pstoremem/addr_book.go | 43 +++++++++++++++++++++--------------------- pstoremem/protobook.go | 18 +++++++++--------- 2 files changed, 30 insertions(+), 31 deletions(-) diff --git a/pstoremem/addr_book.go b/pstoremem/addr_book.go index 6baee9e..8ea785d 100644 --- a/pstoremem/addr_book.go +++ b/pstoremem/addr_book.go @@ -30,10 +30,13 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool { type addrSegments [256]*addrSegment type addrSegment struct { - size uint32 + sync.RWMutex - lk sync.RWMutex + // Use pointers to save memory. Maps always leave some fraction of their + // space unused. storing the *values* directly in the map will + // drastically increase the space waste. In our case, by 6x. addrs map[peer.ID]map[string]*expiringAddr + size uint32 } func (s *addrSegments) get(id peer.ID) *addrSegment { @@ -43,10 +46,6 @@ func (s *addrSegments) get(id peer.ID) *addrSegment { // memoryAddrBook manages addresses. type memoryAddrBook struct { - addrmu sync.RWMutex - // Use pointers to save memory. Maps always leave some fraction of their - // space unused. storing the *values* directly in the map will - // drastically increase the space waste. In our case, by 6x. segments addrSegments ctx context.Context @@ -101,7 +100,7 @@ func (mab *memoryAddrBook) Close() error { func (mab *memoryAddrBook) gc() { now := time.Now() for _, s := range mab.segments { - s.lk.Lock() + s.Lock() for p, amap := range s.addrs { for k, addr := range amap { if addr.ExpiredBy(now) { @@ -113,7 +112,7 @@ func (mab *memoryAddrBook) gc() { } } atomic.StoreUint32(&s.size, uint32(len(s.addrs))) - s.lk.Unlock() + s.Unlock() } } @@ -126,11 +125,11 @@ func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { pids := make(peer.IDSlice, 0, length) for _, s := range mab.segments { - s.lk.RLock() + s.RLock() for pid, _ := range s.addrs { pids = append(pids, pid) } - s.lk.RUnlock() + s.RUnlock() } return pids } @@ -150,8 +149,8 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du } s := mab.segments.get(p) - s.lk.Lock() - defer s.lk.Unlock() + s.Lock() + defer s.Unlock() // update the segment size defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) @@ -186,8 +185,8 @@ func (mab *memoryAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati // This is used when we receive the best estimate of the validity of an address. func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { s := mab.segments.get(p) - s.lk.Lock() - defer s.lk.Unlock() + s.Lock() + defer s.Unlock() // update the segment size defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) @@ -220,8 +219,8 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du // the given oldTTL to have the given newTTL. func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { s := mab.segments.get(p) - s.lk.Lock() - defer s.lk.Unlock() + s.Lock() + defer s.Unlock() // update the segment size defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) @@ -244,8 +243,8 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t // Addresses returns all known (and valid) addresses for a given func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { s := mab.segments.get(p) - s.lk.RLock() - defer s.lk.RUnlock() + s.RLock() + defer s.RUnlock() amap, found := s.addrs[p] if !found { @@ -266,8 +265,8 @@ func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { // ClearAddrs removes all previously stored addresses func (mab *memoryAddrBook) ClearAddrs(p peer.ID) { s := mab.segments.get(p) - s.lk.Lock() - defer s.lk.Unlock() + s.Lock() + defer s.Unlock() // update the segment size defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) @@ -279,8 +278,8 @@ func (mab *memoryAddrBook) ClearAddrs(p peer.ID) { // given peer ID will be published. func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { s := mab.segments.get(p) - s.lk.RLock() - defer s.lk.RUnlock() + s.RLock() + defer s.RUnlock() baseaddrslice := s.addrs[p] initial := make([]ma.Multiaddr, 0, len(baseaddrslice)) diff --git a/pstoremem/protobook.go b/pstoremem/protobook.go index 9074696..e9ca4d7 100644 --- a/pstoremem/protobook.go +++ b/pstoremem/protobook.go @@ -12,7 +12,7 @@ const maxInternedProtocols = 64 const maxInternedProtocolSize = 128 type protoSegment struct { - lk sync.RWMutex + sync.RWMutex interned map[string]string protocols map[peer.ID]map[string]struct{} } @@ -63,8 +63,8 @@ func NewProtoBook() pstore.ProtoBook { func (pb *memoryProtoBook) SetProtocols(p peer.ID, protos ...string) error { s := pb.segments.get(p) - s.lk.Lock() - defer s.lk.Unlock() + s.Lock() + defer s.Unlock() newprotos := make(map[string]struct{}, len(protos)) for _, proto := range protos { @@ -78,8 +78,8 @@ func (pb *memoryProtoBook) SetProtocols(p peer.ID, protos ...string) error { func (pb *memoryProtoBook) AddProtocols(p peer.ID, protos ...string) error { s := pb.segments.get(p) - s.lk.Lock() - defer s.lk.Unlock() + s.Lock() + defer s.Unlock() protomap, ok := s.protocols[p] if !ok { @@ -96,8 +96,8 @@ func (pb *memoryProtoBook) AddProtocols(p peer.ID, protos ...string) error { func (pb *memoryProtoBook) GetProtocols(p peer.ID) ([]string, error) { s := pb.segments.get(p) - s.lk.RLock() - defer s.lk.RUnlock() + s.RLock() + defer s.RUnlock() out := make([]string, 0, len(s.protocols)) for k := range s.protocols[p] { @@ -109,8 +109,8 @@ func (pb *memoryProtoBook) GetProtocols(p peer.ID) ([]string, error) { func (pb *memoryProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { s := pb.segments.get(p) - s.lk.RLock() - defer s.lk.RUnlock() + s.RLock() + defer s.RUnlock() out := make([]string, 0, len(protos)) for _, proto := range protos { From a9e3f07759c0c51232e5e406360ade2c5bf5506c Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 17 May 2019 12:47:07 +0300 Subject: [PATCH 06/12] global interned protocol table --- pstoremem/protobook.go | 66 +++++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/pstoremem/protobook.go b/pstoremem/protobook.go index e9ca4d7..e401701 100644 --- a/pstoremem/protobook.go +++ b/pstoremem/protobook.go @@ -8,12 +8,13 @@ import ( pstore "github.com/libp2p/go-libp2p-peerstore" ) -const maxInternedProtocols = 64 -const maxInternedProtocolSize = 128 +const ( + maxInternedProtocols = 512 + maxInternedProtocolSize = 256 +) type protoSegment struct { sync.RWMutex - interned map[string]string protocols map[peer.ID]map[string]struct{} } @@ -24,35 +25,21 @@ func (s *protoSegments) get(id peer.ID) *protoSegment { return s[b[len(b)-1]] } -func (s *protoSegment) internProtocol(proto string) string { - if len(proto) > maxInternedProtocolSize { - return proto - } - - if interned, ok := s.interned[proto]; ok { - return interned - } - - if len(s.interned) >= maxInternedProtocols { - s.interned = make(map[string]string, maxInternedProtocols) - } - - s.interned[proto] = proto - return proto -} - type memoryProtoBook struct { segments protoSegments + + lk sync.RWMutex + interned map[string]string } var _ pstore.ProtoBook = (*memoryProtoBook)(nil) func NewProtoBook() pstore.ProtoBook { return &memoryProtoBook{ + interned: make(map[string]string, maxInternedProtocols), segments: func() (ret protoSegments) { for i := range ret { ret[i] = &protoSegment{ - interned: make(map[string]string), protocols: make(map[peer.ID]map[string]struct{}), } } @@ -61,6 +48,39 @@ func NewProtoBook() pstore.ProtoBook { } } +func (pb *memoryProtoBook) internProtocol(proto string) string { + if len(proto) > maxInternedProtocolSize { + return proto + } + + // check if it is interned with the read lock + pb.lk.RLock() + interned, ok := pb.interned[proto] + pb.lk.RUnlock() + + if ok { + return interned + } + + // intern with the write lock + pb.lk.Lock() + defer pb.lk.Unlock() + + // check again in case it got interned in between locks + interned, ok = pb.interned[proto] + if ok { + return interned + } + + // if we've filled the table, throw it away and start over + if len(pb.interned) >= maxInternedProtocols { + pb.interned = make(map[string]string, maxInternedProtocols) + } + + pb.interned[proto] = proto + return proto +} + func (pb *memoryProtoBook) SetProtocols(p peer.ID, protos ...string) error { s := pb.segments.get(p) s.Lock() @@ -68,7 +88,7 @@ func (pb *memoryProtoBook) SetProtocols(p peer.ID, protos ...string) error { newprotos := make(map[string]struct{}, len(protos)) for _, proto := range protos { - newprotos[s.internProtocol(proto)] = struct{}{} + newprotos[pb.internProtocol(proto)] = struct{}{} } s.protocols[p] = newprotos @@ -88,7 +108,7 @@ func (pb *memoryProtoBook) AddProtocols(p peer.ID, protos ...string) error { } for _, proto := range protos { - protomap[s.internProtocol(proto)] = struct{}{} + protomap[pb.internProtocol(proto)] = struct{}{} } return nil From 3140aaa2c30593ba6d8bcff6d57aefd08eaf4af9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 17 May 2019 13:13:18 +0300 Subject: [PATCH 07/12] implement protobook for pstoreds --- pstoreds/peerstore.go | 4 +- pstoreds/protobook.go | 122 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 pstoreds/protobook.go diff --git a/pstoreds/peerstore.go b/pstoreds/peerstore.go index be7b09e..b4560f7 100644 --- a/pstoreds/peerstore.go +++ b/pstoreds/peerstore.go @@ -63,7 +63,9 @@ func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (pstore. return nil, err } - ps := pstore.NewPeerstore(keyBook, addrBook, peerMetadata) + protoBook := NewProtoBook(peerMetadata) + + ps := pstore.NewPeerstore(keyBook, addrBook, protoBook, peerMetadata) return ps, nil } diff --git a/pstoreds/protobook.go b/pstoreds/protobook.go new file mode 100644 index 0000000..27184f7 --- /dev/null +++ b/pstoreds/protobook.go @@ -0,0 +1,122 @@ +package pstoreds + +import ( + "fmt" + "sync" + + peer "github.com/libp2p/go-libp2p-peer" + + pstore "github.com/libp2p/go-libp2p-peerstore" +) + +type dsProtoBook struct { + lks [256]sync.RWMutex + meta pstore.PeerMetadata +} + +var _ pstore.ProtoBook = (*dsProtoBook)(nil) + +func NewProtoBook(meta pstore.PeerMetadata) pstore.ProtoBook { + return &dsProtoBook{meta: meta} +} + +func (pb *dsProtoBook) Lock(p peer.ID) { + b := []byte(p) + pb.lks[b[len(b)-1]].Lock() +} + +func (pb *dsProtoBook) Unlock(p peer.ID) { + b := []byte(p) + pb.lks[b[len(b)-1]].Unlock() +} + +func (pb *dsProtoBook) RLock(p peer.ID) { + b := []byte(p) + pb.lks[b[len(b)-1]].RLock() +} + +func (pb *dsProtoBook) RUnlock(p peer.ID) { + b := []byte(p) + pb.lks[b[len(b)-1]].RUnlock() +} + +func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error { + pb.Lock(p) + defer pb.Unlock(p) + + protomap := make(map[string]struct{}, len(protos)) + for _, proto := range protos { + protomap[proto] = struct{}{} + } + + return pb.meta.Put(p, "protocols", protomap) +} + +func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error { + pb.Lock(p) + defer pb.Unlock(p) + + pmap, err := pb.getProtocolMap(p) + if err != nil { + return err + } + + for _, proto := range protos { + pmap[proto] = struct{}{} + } + + return pb.meta.Put(p, "protocols", pmap) +} + +func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]string, error) { + pb.RLock(p) + defer pb.RUnlock(p) + + pmap, err := pb.getProtocolMap(p) + if err != nil { + return nil, err + } + + res := make([]string, 0, len(pmap)) + for proto := range pmap { + res = append(res, proto) + } + + return res, nil +} + +func (pb *dsProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { + pb.RLock(p) + defer pb.RUnlock(p) + + pmap, err := pb.getProtocolMap(p) + if err != nil { + return nil, err + } + + res := make([]string, 0, len(protos)) + for _, proto := range protos { + if _, ok := pmap[proto]; ok { + res = append(res, proto) + } + } + + return res, nil +} + +func (pb *dsProtoBook) getProtocolMap(p peer.ID) (map[string]struct{}, error) { + iprotomap, err := pb.meta.Get(p, "protocols") + switch err { + default: + return nil, err + case pstore.ErrNotFound: + return make(map[string]struct{}), nil + case nil: + cast, ok := iprotomap.(map[string]struct{}) + if !ok { + return nil, fmt.Errorf("stored protocol set was not a map") + } + + return cast, nil + } +} From d340d2983ef1d0505ebb8a7a9de9c8691c36f7d9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 17 May 2019 13:14:39 +0300 Subject: [PATCH 08/12] cosmetics --- pstoremem/addr_book.go | 4 ++-- pstoremem/protobook.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pstoremem/addr_book.go b/pstoremem/addr_book.go index 8ea785d..2bc80f1 100644 --- a/pstoremem/addr_book.go +++ b/pstoremem/addr_book.go @@ -39,8 +39,8 @@ type addrSegment struct { size uint32 } -func (s *addrSegments) get(id peer.ID) *addrSegment { - b := []byte(id) +func (s *addrSegments) get(p peer.ID) *addrSegment { + b := []byte(p) return s[b[len(b)-1]] } diff --git a/pstoremem/protobook.go b/pstoremem/protobook.go index e401701..a112d00 100644 --- a/pstoremem/protobook.go +++ b/pstoremem/protobook.go @@ -20,8 +20,8 @@ type protoSegment struct { type protoSegments [256]*protoSegment -func (s *protoSegments) get(id peer.ID) *protoSegment { - b := []byte(id) +func (s *protoSegments) get(p peer.ID) *protoSegment { + b := []byte(p) return s[b[len(b)-1]] } From afca355218d264c8bb879e90ed84e2250a3743e3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 17 May 2019 23:36:46 +0300 Subject: [PATCH 09/12] index the peer ID strings directly to avoid copying the byte slice --- pstoreds/protobook.go | 12 ++++-------- pstoremem/addr_book.go | 3 +-- pstoremem/protobook.go | 3 +-- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/pstoreds/protobook.go b/pstoreds/protobook.go index 27184f7..88e2883 100644 --- a/pstoreds/protobook.go +++ b/pstoreds/protobook.go @@ -21,23 +21,19 @@ func NewProtoBook(meta pstore.PeerMetadata) pstore.ProtoBook { } func (pb *dsProtoBook) Lock(p peer.ID) { - b := []byte(p) - pb.lks[b[len(b)-1]].Lock() + pb.lks[byte(p[len(p)-1])].Lock() } func (pb *dsProtoBook) Unlock(p peer.ID) { - b := []byte(p) - pb.lks[b[len(b)-1]].Unlock() + pb.lks[byte(p[len(p)-1])].Unlock() } func (pb *dsProtoBook) RLock(p peer.ID) { - b := []byte(p) - pb.lks[b[len(b)-1]].RLock() + pb.lks[byte(p[len(p)-1])].RLock() } func (pb *dsProtoBook) RUnlock(p peer.ID) { - b := []byte(p) - pb.lks[b[len(b)-1]].RUnlock() + pb.lks[byte(p[len(p)-1])].RUnlock() } func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error { diff --git a/pstoremem/addr_book.go b/pstoremem/addr_book.go index 2bc80f1..9278276 100644 --- a/pstoremem/addr_book.go +++ b/pstoremem/addr_book.go @@ -40,8 +40,7 @@ type addrSegment struct { } func (s *addrSegments) get(p peer.ID) *addrSegment { - b := []byte(p) - return s[b[len(b)-1]] + return s[byte(p[len(p)-1])] } // memoryAddrBook manages addresses. diff --git a/pstoremem/protobook.go b/pstoremem/protobook.go index a112d00..04cd145 100644 --- a/pstoremem/protobook.go +++ b/pstoremem/protobook.go @@ -21,8 +21,7 @@ type protoSegment struct { type protoSegments [256]*protoSegment func (s *protoSegments) get(p peer.ID) *protoSegment { - b := []byte(p) - return s[b[len(b)-1]] + return s[byte(p[len(p)-1])] } type memoryProtoBook struct { From 28b1a9f31d68871f2a841f3a15a9d41f1d1998f4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 17 May 2019 23:39:21 +0300 Subject: [PATCH 10/12] don't track segment sizes, it's a fruitless optimization --- pstoremem/addr_book.go | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/pstoremem/addr_book.go b/pstoremem/addr_book.go index 9278276..a497821 100644 --- a/pstoremem/addr_book.go +++ b/pstoremem/addr_book.go @@ -4,7 +4,6 @@ import ( "context" "sort" "sync" - "sync/atomic" "time" logging "github.com/ipfs/go-log" @@ -36,7 +35,6 @@ type addrSegment struct { // space unused. storing the *values* directly in the map will // drastically increase the space waste. In our case, by 6x. addrs map[peer.ID]map[string]*expiringAddr - size uint32 } func (s *addrSegments) get(p peer.ID) *addrSegment { @@ -110,19 +108,13 @@ func (mab *memoryAddrBook) gc() { delete(s.addrs, p) } } - atomic.StoreUint32(&s.size, uint32(len(s.addrs))) s.Unlock() } } func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { - var length uint32 - for _, s := range mab.segments { - length += atomic.LoadUint32(&s.size) - } - - pids := make(peer.IDSlice, 0, length) + var pids peer.IDSlice for _, s := range mab.segments { s.RLock() for pid, _ := range s.addrs { @@ -151,9 +143,6 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du s.Lock() defer s.Unlock() - // update the segment size - defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) - amap := s.addrs[p] if amap == nil { amap = make(map[string]*expiringAddr, len(addrs)) @@ -187,9 +176,6 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du s.Lock() defer s.Unlock() - // update the segment size - defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) - amap := s.addrs[p] if amap == nil { amap = make(map[string]*expiringAddr, len(addrs)) @@ -221,9 +207,6 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t s.Lock() defer s.Unlock() - // update the segment size - defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) - amap, found := s.addrs[p] if !found { return @@ -267,9 +250,6 @@ func (mab *memoryAddrBook) ClearAddrs(p peer.ID) { s.Lock() defer s.Unlock() - // update the segment size - defer atomic.StoreUint32(&s.size, uint32(len(s.addrs))) - delete(s.addrs, p) } From 07ee3fb062bc7980359a57806ddf97bb795cb838 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 17 May 2019 23:41:05 +0300 Subject: [PATCH 11/12] make lock methods private --- pstoreds/protobook.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pstoreds/protobook.go b/pstoreds/protobook.go index 88e2883..02c865a 100644 --- a/pstoreds/protobook.go +++ b/pstoreds/protobook.go @@ -20,25 +20,25 @@ func NewProtoBook(meta pstore.PeerMetadata) pstore.ProtoBook { return &dsProtoBook{meta: meta} } -func (pb *dsProtoBook) Lock(p peer.ID) { +func (pb *dsProtoBook) lock(p peer.ID) { pb.lks[byte(p[len(p)-1])].Lock() } -func (pb *dsProtoBook) Unlock(p peer.ID) { +func (pb *dsProtoBook) unlock(p peer.ID) { pb.lks[byte(p[len(p)-1])].Unlock() } -func (pb *dsProtoBook) RLock(p peer.ID) { +func (pb *dsProtoBook) rlock(p peer.ID) { pb.lks[byte(p[len(p)-1])].RLock() } -func (pb *dsProtoBook) RUnlock(p peer.ID) { +func (pb *dsProtoBook) runlock(p peer.ID) { pb.lks[byte(p[len(p)-1])].RUnlock() } func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error { - pb.Lock(p) - defer pb.Unlock(p) + pb.lock(p) + defer pb.unlock(p) protomap := make(map[string]struct{}, len(protos)) for _, proto := range protos { @@ -49,8 +49,8 @@ func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error { } func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error { - pb.Lock(p) - defer pb.Unlock(p) + pb.lock(p) + defer pb.unlock(p) pmap, err := pb.getProtocolMap(p) if err != nil { @@ -65,8 +65,8 @@ func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error { } func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]string, error) { - pb.RLock(p) - defer pb.RUnlock(p) + pb.rlock(p) + defer pb.runlock(p) pmap, err := pb.getProtocolMap(p) if err != nil { @@ -82,8 +82,8 @@ func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]string, error) { } func (pb *dsProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { - pb.RLock(p) - defer pb.RUnlock(p) + pb.rlock(p) + defer pb.runlock(p) pmap, err := pb.getProtocolMap(p) if err != nil { From 7fac6b6d984a05091900d9d8c798e880e64daf63 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 18 May 2019 11:42:47 +0300 Subject: [PATCH 12/12] use segments in ds protobook instead of a lock array --- pstoreds/protobook.go | 52 ++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/pstoreds/protobook.go b/pstoreds/protobook.go index 02c865a..9d86427 100644 --- a/pstoreds/protobook.go +++ b/pstoreds/protobook.go @@ -9,36 +9,38 @@ import ( pstore "github.com/libp2p/go-libp2p-peerstore" ) -type dsProtoBook struct { - lks [256]sync.RWMutex - meta pstore.PeerMetadata +type protoSegment struct { + sync.RWMutex } -var _ pstore.ProtoBook = (*dsProtoBook)(nil) - -func NewProtoBook(meta pstore.PeerMetadata) pstore.ProtoBook { - return &dsProtoBook{meta: meta} -} +type protoSegments [256]*protoSegment -func (pb *dsProtoBook) lock(p peer.ID) { - pb.lks[byte(p[len(p)-1])].Lock() +func (s *protoSegments) get(p peer.ID) *protoSegment { + return s[byte(p[len(p)-1])] } -func (pb *dsProtoBook) unlock(p peer.ID) { - pb.lks[byte(p[len(p)-1])].Unlock() +type dsProtoBook struct { + segments protoSegments + meta pstore.PeerMetadata } -func (pb *dsProtoBook) rlock(p peer.ID) { - pb.lks[byte(p[len(p)-1])].RLock() -} +var _ pstore.ProtoBook = (*dsProtoBook)(nil) -func (pb *dsProtoBook) runlock(p peer.ID) { - pb.lks[byte(p[len(p)-1])].RUnlock() +func NewProtoBook(meta pstore.PeerMetadata) pstore.ProtoBook { + return &dsProtoBook{ + meta: meta, + segments: func() (ret protoSegments) { + for i := range ret { + ret[i] = &protoSegment{} + } + return ret + }(), + } } func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error { - pb.lock(p) - defer pb.unlock(p) + pb.segments.get(p).Lock() + defer pb.segments.get(p).Unlock() protomap := make(map[string]struct{}, len(protos)) for _, proto := range protos { @@ -49,8 +51,8 @@ func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error { } func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error { - pb.lock(p) - defer pb.unlock(p) + pb.segments.get(p).Lock() + defer pb.segments.get(p).Unlock() pmap, err := pb.getProtocolMap(p) if err != nil { @@ -65,8 +67,8 @@ func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error { } func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]string, error) { - pb.rlock(p) - defer pb.runlock(p) + pb.segments.get(p).RLock() + defer pb.segments.get(p).RUnlock() pmap, err := pb.getProtocolMap(p) if err != nil { @@ -82,8 +84,8 @@ func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]string, error) { } func (pb *dsProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { - pb.rlock(p) - defer pb.runlock(p) + pb.segments.get(p).RLock() + defer pb.segments.get(p).RUnlock() pmap, err := pb.getProtocolMap(p) if err != nil {