From 8beff38d229500aa0af2873e1128c3e702103cce Mon Sep 17 00:00:00 2001 From: lnykww Date: Wed, 20 Mar 2019 11:35:29 +0800 Subject: [PATCH] add user-defined conn select alg and user-defined dest select alg --- best.go | 21 +++++++++++++++++++++ swarm.go | 42 ++++++++++++++++++++++++++++++++++++++++++ swarm_conn.go | 4 ++++ swarm_dial.go | 15 ++++++++++++--- 4 files changed, 79 insertions(+), 3 deletions(-) create mode 100644 best.go diff --git a/best.go b/best.go new file mode 100644 index 00000000..187ebeb5 --- /dev/null +++ b/best.go @@ -0,0 +1,21 @@ +package swarm + +import ( + peer "github.com/libp2p/go-libp2p-peer" + ma "github.com/multiformats/go-multiaddr" +) + +// BestConn supports user-defined connection selection algorithm +// BestConnFallback is BestConn's downgrade algorithm. When BestConn +// cann't select a connection, we downgrade using BestConnFallback to +// select the connection. +type BestConn interface { + BestConn(peer.ID, []*Conn) *Conn + BestConnFallback(peer.ID, []*Conn) *Conn +} + +// If there is multiple good address can connect to the peer, +// We use this interface to select the best address to peer. +type BestDest interface { + BestDestSelect(peer.ID, []ma.Multiaddr) []ma.Multiaddr +} diff --git a/swarm.go b/swarm.go index cc77e80e..c3c42cae 100644 --- a/swarm.go +++ b/swarm.go @@ -18,6 +18,7 @@ import ( pstore "github.com/libp2p/go-libp2p-peerstore" transport "github.com/libp2p/go-libp2p-transport" filter "github.com/libp2p/go-maddr-filter" + ma "github.com/multiformats/go-multiaddr" mafilter "github.com/whyrusleeping/multiaddr-filter" ) @@ -81,6 +82,9 @@ type Swarm struct { // filters for addresses that shouldnt be dialed (or accepted) Filters *filter.Filters + bestConn BestConn + bestDest BestDest + proc goprocess.Process ctx context.Context bwc metrics.Reporter @@ -274,6 +278,16 @@ func (s *Swarm) StreamHandler() inet.StreamHandler { return handler } +// SetBestConn set the BestConn interface +func (s *Swarm) SetBestConn(bc BestConn) { + s.bestConn = bc +} + +// SetBestDest set the BestDest interface +func (s *Swarm) SetBestDest(bd BestDest) { + s.bestDest = bd +} + // NewStream creates a new stream on any available connection to peer, dialing // if necessary. func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) { @@ -359,6 +373,34 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { return best } +// Wrapper for BestConn Interface +func (s *Swarm) bestConnToPeerWrapper(p peer.ID) *Conn { + if s.bestConn == nil { + return s.bestConnToPeer(p) + } + s.conns.RLock() + defer s.conns.RUnlock() + return s.bestConn.BestConn(p, s.conns.m[p]) +} + +// Wrapper for BestConnFallback Interface +func (s *Swarm) bestConnToPeerFallbackWrapper(p peer.ID) *Conn { + if s.bestConn == nil { + return s.bestConnToPeer(p) + } + s.conns.RLock() + defer s.conns.RUnlock() + return s.bestConn.BestConnFallback(p, s.conns.m[p]) +} + +// Wrapper for BestDest Interface +func (s *Swarm) bestDestSelectWrapper(id peer.ID, addrs []ma.Multiaddr) []ma.Multiaddr { + if s.bestDest == nil { + return addrs + } + return s.bestDest.BestDestSelect(id, addrs) +} + // Connectedness returns our "connectedness" state with the given peer. // // To check if we have an open connection, use `s.Connectedness(p) == diff --git a/swarm_conn.go b/swarm_conn.go index 26a7794d..9bb3303b 100644 --- a/swarm_conn.go +++ b/swarm_conn.go @@ -48,6 +48,10 @@ func (c *Conn) Close() error { return c.err } +func (c *Conn) IsClosed() bool { + return c.conn.IsClosed() +} + func (c *Conn) doClose() { c.swarm.removeConn(c) diff --git a/swarm_dial.go b/swarm_dial.go index 8a3ba491..e417f1cf 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -196,7 +196,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done() // check if we already have an open connection first - conn := s.bestConnToPeer(p) + conn := s.bestConnToPeerWrapper(p) if conn != nil { return conn, nil } @@ -226,7 +226,7 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { // Short circuit. // By the time we take the dial lock, we may already *have* a connection // to the peer. - c := s.bestConnToPeer(p) + c := s.bestConnToPeerWrapper(p) if c != nil { return c, nil } @@ -239,7 +239,7 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { conn, err := s.dial(ctx, p) if err != nil { - conn = s.bestConnToPeer(p) + conn = s.bestConnToPeerFallbackWrapper(p) if conn != nil { // Hm? What error? // Could have canceled the dial because we received a @@ -294,9 +294,18 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return nil, errors.New("no addresses") } goodAddrs := s.filterKnownUndialables(peerAddrs) + if len(goodAddrs) == 0 { return nil, errors.New("no good addresses") } + + if s.bestDest != nil { + // Select the best address to peer. + bestAddrs := s.bestDestSelectWrapper(p, goodAddrs) + if len(bestAddrs) != 0 { + goodAddrs = bestAddrs + } + } goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs)) for _, a := range goodAddrs { goodAddrsChan <- a