diff --git a/interface.go b/interface.go index c669ba9..e3d988c 100644 --- a/interface.go +++ b/interface.go @@ -56,17 +56,13 @@ type Peerstore interface { KeyBook PeerMetadata Metrics + ProtoBook // PeerInfo returns a peer.PeerInfo struct for given peer.ID. // This is a small slice of the information Peerstore has on // that peer, useful to other services. PeerInfo(peer.ID) PeerInfo - GetProtocols(peer.ID) ([]string, error) - AddProtocols(peer.ID, ...string) error - SetProtocols(peer.ID, ...string) error - SupportsProtocols(peer.ID, ...string) ([]string, error) - // Peers returns all of the peer IDs stored across all inner stores. Peers() peer.IDSlice } @@ -142,3 +138,11 @@ type KeyBook interface { // PeersWithKeys returns all the peer IDs stored in the KeyBook PeersWithKeys() peer.IDSlice } + +// ProtoBook tracks the protocols supported by peers +type ProtoBook interface { + GetProtocols(peer.ID) ([]string, error) + AddProtocols(peer.ID, ...string) error + SetProtocols(peer.ID, ...string) error + SupportsProtocols(peer.ID, ...string) ([]string, error) +} diff --git a/peerstore.go b/peerstore.go index f7f87b8..ea29a17 100644 --- a/peerstore.go +++ b/peerstore.go @@ -3,37 +3,30 @@ package peerstore import ( "fmt" "io" - "sync" peer "github.com/libp2p/go-libp2p-peer" ) var _ Peerstore = (*peerstore)(nil) -const maxInternedProtocols = 512 -const maxInternedProtocolSize = 256 - type peerstore struct { Metrics KeyBook AddrBook + ProtoBook PeerMetadata - - // lock for protocol information, separate from datastore lock - protolock sync.RWMutex - internedProtocols map[string]string } // NewPeerstore creates a data structure that stores peer data, backed by the // supplied implementations of KeyBook, AddrBook and PeerMetadata. -func NewPeerstore(kb KeyBook, ab AddrBook, md PeerMetadata) Peerstore { +func NewPeerstore(kb KeyBook, ab AddrBook, pb ProtoBook, md PeerMetadata) Peerstore { return &peerstore{ - KeyBook: kb, - AddrBook: ab, - PeerMetadata: md, - Metrics: NewMetrics(), - internedProtocols: make(map[string]string), + KeyBook: kb, + AddrBook: ab, + ProtoBook: pb, + PeerMetadata: md, + Metrics: NewMetrics(), } } @@ -49,6 +42,7 @@ func (ps *peerstore) Close() (err error) { weakClose("keybook", ps.KeyBook) weakClose("addressbook", ps.AddrBook) + weakClose("protobook", ps.ProtoBook) weakClose("peermetadata", ps.PeerMetadata) if len(errs) > 0 { @@ -80,101 +74,6 @@ func (ps *peerstore) PeerInfo(p peer.ID) PeerInfo { } } -func (ps *peerstore) internProtocol(s string) string { - if len(s) > maxInternedProtocolSize { - return s - } - - if interned, ok := ps.internedProtocols[s]; ok { - return interned - } - - if len(ps.internedProtocols) >= maxInternedProtocols { - ps.internedProtocols = make(map[string]string, maxInternedProtocols) - } - - ps.internedProtocols[s] = s - return s -} - -func (ps *peerstore) SetProtocols(p peer.ID, protos ...string) error { - ps.protolock.Lock() - defer ps.protolock.Unlock() - - protomap := make(map[string]struct{}, len(protos)) - for _, proto := range protos { - protomap[ps.internProtocol(proto)] = struct{}{} - } - - return ps.Put(p, "protocols", protomap) -} - -func (ps *peerstore) AddProtocols(p peer.ID, protos ...string) error { - ps.protolock.Lock() - defer ps.protolock.Unlock() - protomap, err := ps.getProtocolMap(p) - if err != nil { - return err - } - - for _, proto := range protos { - protomap[ps.internProtocol(proto)] = struct{}{} - } - - return ps.Put(p, "protocols", protomap) -} - -func (ps *peerstore) getProtocolMap(p peer.ID) (map[string]struct{}, error) { - iprotomap, err := ps.Get(p, "protocols") - switch err { - default: - return nil, err - case ErrNotFound: - return make(map[string]struct{}), nil - case nil: - cast, ok := iprotomap.(map[string]struct{}) - if !ok { - return nil, fmt.Errorf("stored protocol set was not a map") - } - - return cast, nil - } -} - -func (ps *peerstore) GetProtocols(p peer.ID) ([]string, error) { - ps.protolock.RLock() - defer ps.protolock.RUnlock() - pmap, err := ps.getProtocolMap(p) - if err != nil { - return nil, err - } - - out := make([]string, 0, len(pmap)) - for k := range pmap { - out = append(out, k) - } - - return out, nil -} - -func (ps *peerstore) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { - ps.protolock.RLock() - defer ps.protolock.RUnlock() - pmap, err := ps.getProtocolMap(p) - if err != nil { - return nil, err - } - - out := make([]string, 0, len(protos)) - for _, proto := range protos { - if _, ok := pmap[proto]; ok { - out = append(out, proto) - } - } - - return out, nil -} - func PeerInfos(ps Peerstore, peers peer.IDSlice) []PeerInfo { pi := make([]PeerInfo, len(peers)) for i, p := range peers { diff --git a/pstoreds/peerstore.go b/pstoreds/peerstore.go index be7b09e..b4560f7 100644 --- a/pstoreds/peerstore.go +++ b/pstoreds/peerstore.go @@ -63,7 +63,9 @@ func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (pstore. return nil, err } - ps := pstore.NewPeerstore(keyBook, addrBook, peerMetadata) + protoBook := NewProtoBook(peerMetadata) + + ps := pstore.NewPeerstore(keyBook, addrBook, protoBook, peerMetadata) return ps, nil } diff --git a/pstoreds/protobook.go b/pstoreds/protobook.go new file mode 100644 index 0000000..9d86427 --- /dev/null +++ b/pstoreds/protobook.go @@ -0,0 +1,120 @@ +package pstoreds + +import ( + "fmt" + "sync" + + peer "github.com/libp2p/go-libp2p-peer" + + pstore "github.com/libp2p/go-libp2p-peerstore" +) + +type protoSegment struct { + sync.RWMutex +} + +type protoSegments [256]*protoSegment + +func (s *protoSegments) get(p peer.ID) *protoSegment { + return s[byte(p[len(p)-1])] +} + +type dsProtoBook struct { + segments protoSegments + meta pstore.PeerMetadata +} + +var _ pstore.ProtoBook = (*dsProtoBook)(nil) + +func NewProtoBook(meta pstore.PeerMetadata) pstore.ProtoBook { + return &dsProtoBook{ + meta: meta, + segments: func() (ret protoSegments) { + for i := range ret { + ret[i] = &protoSegment{} + } + return ret + }(), + } +} + +func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error { + pb.segments.get(p).Lock() + defer pb.segments.get(p).Unlock() + + protomap := make(map[string]struct{}, len(protos)) + for _, proto := range protos { + protomap[proto] = struct{}{} + } + + return pb.meta.Put(p, "protocols", protomap) +} + +func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error { + pb.segments.get(p).Lock() + defer pb.segments.get(p).Unlock() + + pmap, err := pb.getProtocolMap(p) + if err != nil { + return err + } + + for _, proto := range protos { + pmap[proto] = struct{}{} + } + + return pb.meta.Put(p, "protocols", pmap) +} + +func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]string, error) { + pb.segments.get(p).RLock() + defer pb.segments.get(p).RUnlock() + + pmap, err := pb.getProtocolMap(p) + if err != nil { + return nil, err + } + + res := make([]string, 0, len(pmap)) + for proto := range pmap { + res = append(res, proto) + } + + return res, nil +} + +func (pb *dsProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { + pb.segments.get(p).RLock() + defer pb.segments.get(p).RUnlock() + + pmap, err := pb.getProtocolMap(p) + if err != nil { + return nil, err + } + + res := make([]string, 0, len(protos)) + for _, proto := range protos { + if _, ok := pmap[proto]; ok { + res = append(res, proto) + } + } + + return res, nil +} + +func (pb *dsProtoBook) getProtocolMap(p peer.ID) (map[string]struct{}, error) { + iprotomap, err := pb.meta.Get(p, "protocols") + switch err { + default: + return nil, err + case pstore.ErrNotFound: + return make(map[string]struct{}), nil + case nil: + cast, ok := iprotomap.(map[string]struct{}) + if !ok { + return nil, fmt.Errorf("stored protocol set was not a map") + } + + return cast, nil + } +} diff --git a/pstoremem/addr_book.go b/pstoremem/addr_book.go index cb9576a..a497821 100644 --- a/pstoremem/addr_book.go +++ b/pstoremem/addr_book.go @@ -26,15 +26,24 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool { return t.After(e.Expires) } -var _ pstore.AddrBook = (*memoryAddrBook)(nil) +type addrSegments [256]*addrSegment + +type addrSegment struct { + sync.RWMutex -// memoryAddrBook manages addresses. -type memoryAddrBook struct { - addrmu sync.RWMutex // Use pointers to save memory. Maps always leave some fraction of their // space unused. storing the *values* directly in the map will // drastically increase the space waste. In our case, by 6x. addrs map[peer.ID]map[string]*expiringAddr +} + +func (s *addrSegments) get(p peer.ID) *addrSegment { + return s[byte(p[len(p)-1])] +} + +// memoryAddrBook manages addresses. +type memoryAddrBook struct { + segments addrSegments ctx context.Context cancel func() @@ -42,11 +51,18 @@ type memoryAddrBook struct { subManager *AddrSubManager } +var _ pstore.AddrBook = (*memoryAddrBook)(nil) + func NewAddrBook() pstore.AddrBook { ctx, cancel := context.WithCancel(context.Background()) ab := &memoryAddrBook{ - addrs: make(map[peer.ID]map[string]*expiringAddr), + segments: func() (ret addrSegments) { + for i, _ := range ret { + ret[i] = &addrSegment{addrs: make(map[peer.ID]map[string]*expiringAddr)} + } + return ret + }(), subManager: NewAddrSubManager(), ctx: ctx, cancel: cancel, @@ -79,29 +95,32 @@ func (mab *memoryAddrBook) Close() error { // gc garbage collects the in-memory address book. func (mab *memoryAddrBook) gc() { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() - now := time.Now() - for p, amap := range mab.addrs { - for k, addr := range amap { - if addr.ExpiredBy(now) { - delete(amap, k) + for _, s := range mab.segments { + s.Lock() + for p, amap := range s.addrs { + for k, addr := range amap { + if addr.ExpiredBy(now) { + delete(amap, k) + } + } + if len(amap) == 0 { + delete(s.addrs, p) } } - if len(amap) == 0 { - delete(mab.addrs, p) - } + s.Unlock() } + } func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { - mab.addrmu.RLock() - defer mab.addrmu.RUnlock() - - pids := make(peer.IDSlice, 0, len(mab.addrs)) - for pid := range mab.addrs { - pids = append(pids, pid) + var pids peer.IDSlice + for _, s := range mab.segments { + s.RLock() + for pid, _ := range s.addrs { + pids = append(pids, pid) + } + s.RUnlock() } return pids } @@ -115,18 +134,19 @@ func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati // (time-to-live), after which the address is no longer valid. // If the manager has a longer TTL, the operation is a no-op for that address func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() - // if ttl is zero, exit. nothing to do. if ttl <= 0 { return } - amap := mab.addrs[p] + s := mab.segments.get(p) + s.Lock() + defer s.Unlock() + + amap := s.addrs[p] if amap == nil { amap = make(map[string]*expiringAddr, len(addrs)) - mab.addrs[p] = amap + s.addrs[p] = amap } exp := time.Now().Add(ttl) for _, addr := range addrs { @@ -152,13 +172,14 @@ func (mab *memoryAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati // SetAddrs sets the ttl on addresses. This clears any TTL there previously. // This is used when we receive the best estimate of the validity of an address. func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() + s := mab.segments.get(p) + s.Lock() + defer s.Unlock() - amap := mab.addrs[p] + amap := s.addrs[p] if amap == nil { amap = make(map[string]*expiringAddr, len(addrs)) - mab.addrs[p] = amap + s.addrs[p] = amap } exp := time.Now().Add(ttl) @@ -172,7 +193,6 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du if ttl > 0 { amap[addrstr] = &expiringAddr{Addr: addr, Expires: exp, TTL: ttl} - mab.subManager.BroadcastAddr(p, addr) } else { delete(amap, addrstr) @@ -183,10 +203,11 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du // UpdateAddrs updates the addresses associated with the given peer that have // the given oldTTL to have the given newTTL. func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() + s := mab.segments.get(p) + s.Lock() + defer s.Unlock() - amap, found := mab.addrs[p] + amap, found := s.addrs[p] if !found { return } @@ -203,10 +224,11 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t // Addresses returns all known (and valid) addresses for a given func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { - mab.addrmu.RLock() - defer mab.addrmu.RUnlock() + s := mab.segments.get(p) + s.RLock() + defer s.RUnlock() - amap, found := mab.addrs[p] + amap, found := s.addrs[p] if !found { return nil } @@ -224,19 +246,21 @@ func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr { // ClearAddrs removes all previously stored addresses func (mab *memoryAddrBook) ClearAddrs(p peer.ID) { - mab.addrmu.Lock() - defer mab.addrmu.Unlock() + s := mab.segments.get(p) + s.Lock() + defer s.Unlock() - delete(mab.addrs, p) + delete(s.addrs, p) } // AddrStream returns a channel on which all new addresses discovered for a // given peer ID will be published. func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { - mab.addrmu.RLock() - defer mab.addrmu.RUnlock() + s := mab.segments.get(p) + s.RLock() + defer s.RUnlock() - baseaddrslice := mab.addrs[p] + baseaddrslice := s.addrs[p] initial := make([]ma.Multiaddr, 0, len(baseaddrslice)) for _, a := range baseaddrslice { initial = append(initial, a.Addr) diff --git a/pstoremem/peerstore.go b/pstoremem/peerstore.go index 7d87313..c7cbd67 100644 --- a/pstoremem/peerstore.go +++ b/pstoremem/peerstore.go @@ -7,5 +7,6 @@ func NewPeerstore() pstore.Peerstore { return pstore.NewPeerstore( NewKeyBook(), NewAddrBook(), + NewProtoBook(), NewPeerMetadata()) } diff --git a/pstoremem/protobook.go b/pstoremem/protobook.go new file mode 100644 index 0000000..04cd145 --- /dev/null +++ b/pstoremem/protobook.go @@ -0,0 +1,142 @@ +package pstoremem + +import ( + "sync" + + peer "github.com/libp2p/go-libp2p-peer" + + pstore "github.com/libp2p/go-libp2p-peerstore" +) + +const ( + maxInternedProtocols = 512 + maxInternedProtocolSize = 256 +) + +type protoSegment struct { + sync.RWMutex + protocols map[peer.ID]map[string]struct{} +} + +type protoSegments [256]*protoSegment + +func (s *protoSegments) get(p peer.ID) *protoSegment { + return s[byte(p[len(p)-1])] +} + +type memoryProtoBook struct { + segments protoSegments + + lk sync.RWMutex + interned map[string]string +} + +var _ pstore.ProtoBook = (*memoryProtoBook)(nil) + +func NewProtoBook() pstore.ProtoBook { + return &memoryProtoBook{ + interned: make(map[string]string, maxInternedProtocols), + segments: func() (ret protoSegments) { + for i := range ret { + ret[i] = &protoSegment{ + protocols: make(map[peer.ID]map[string]struct{}), + } + } + return ret + }(), + } +} + +func (pb *memoryProtoBook) internProtocol(proto string) string { + if len(proto) > maxInternedProtocolSize { + return proto + } + + // check if it is interned with the read lock + pb.lk.RLock() + interned, ok := pb.interned[proto] + pb.lk.RUnlock() + + if ok { + return interned + } + + // intern with the write lock + pb.lk.Lock() + defer pb.lk.Unlock() + + // check again in case it got interned in between locks + interned, ok = pb.interned[proto] + if ok { + return interned + } + + // if we've filled the table, throw it away and start over + if len(pb.interned) >= maxInternedProtocols { + pb.interned = make(map[string]string, maxInternedProtocols) + } + + pb.interned[proto] = proto + return proto +} + +func (pb *memoryProtoBook) SetProtocols(p peer.ID, protos ...string) error { + s := pb.segments.get(p) + s.Lock() + defer s.Unlock() + + newprotos := make(map[string]struct{}, len(protos)) + for _, proto := range protos { + newprotos[pb.internProtocol(proto)] = struct{}{} + } + + s.protocols[p] = newprotos + + return nil +} + +func (pb *memoryProtoBook) AddProtocols(p peer.ID, protos ...string) error { + s := pb.segments.get(p) + s.Lock() + defer s.Unlock() + + protomap, ok := s.protocols[p] + if !ok { + protomap = make(map[string]struct{}) + s.protocols[p] = protomap + } + + for _, proto := range protos { + protomap[pb.internProtocol(proto)] = struct{}{} + } + + return nil +} + +func (pb *memoryProtoBook) GetProtocols(p peer.ID) ([]string, error) { + s := pb.segments.get(p) + s.RLock() + defer s.RUnlock() + + out := make([]string, 0, len(s.protocols)) + for k := range s.protocols[p] { + out = append(out, k) + } + + return out, nil +} + +func (pb *memoryProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) { + s := pb.segments.get(p) + s.RLock() + defer s.RUnlock() + + out := make([]string, 0, len(protos)) + for _, proto := range protos { + if _, ok := s.protocols[p][proto]; ok { + out = append(out, proto) + } + } + + return out, nil +}