Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

add user-defined conn select alg and user-defined dest select alg #114

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions best.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package swarm

import (
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)

// BestConn supports user-defined connection selection algorithm
// BestConnFallback is BestConn's downgrade algorithm. When BestConn
// cann't select a connection, we downgrade using BestConnFallback to
// select the connection.
type BestConn interface {
BestConn(peer.ID, []*Conn) *Conn
BestConnFallback(peer.ID, []*Conn) *Conn
}

// If there is multiple good address can connect to the peer,
// We use this interface to select the best address to peer.
type BestDest interface {
BestDestSelect(peer.ID, []ma.Multiaddr) []ma.Multiaddr
}
42 changes: 42 additions & 0 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
pstore "github.com/libp2p/go-libp2p-peerstore"
transport "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter"
ma "github.com/multiformats/go-multiaddr"
mafilter "github.com/whyrusleeping/multiaddr-filter"
)

Expand Down Expand Up @@ -81,6 +82,9 @@ type Swarm struct {
// filters for addresses that shouldnt be dialed (or accepted)
Filters *filter.Filters

bestConn BestConn
bestDest BestDest

proc goprocess.Process
ctx context.Context
bwc metrics.Reporter
Expand Down Expand Up @@ -274,6 +278,16 @@ func (s *Swarm) StreamHandler() inet.StreamHandler {
return handler
}

// SetBestConn set the BestConn interface
func (s *Swarm) SetBestConn(bc BestConn) {
s.bestConn = bc
}

// SetBestDest set the BestDest interface
func (s *Swarm) SetBestDest(bd BestDest) {
s.bestDest = bd
}

// NewStream creates a new stream on any available connection to peer, dialing
// if necessary.
func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
Expand Down Expand Up @@ -359,6 +373,34 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
return best
}

// Wrapper for BestConn Interface
func (s *Swarm) bestConnToPeerWrapper(p peer.ID) *Conn {
if s.bestConn == nil {
return s.bestConnToPeer(p)
}
s.conns.RLock()
defer s.conns.RUnlock()
return s.bestConn.BestConn(p, s.conns.m[p])
}

// Wrapper for BestConnFallback Interface
func (s *Swarm) bestConnToPeerFallbackWrapper(p peer.ID) *Conn {
if s.bestConn == nil {
return s.bestConnToPeer(p)
}
s.conns.RLock()
defer s.conns.RUnlock()
return s.bestConn.BestConnFallback(p, s.conns.m[p])
}

// Wrapper for BestDest Interface
func (s *Swarm) bestDestSelectWrapper(id peer.ID, addrs []ma.Multiaddr) []ma.Multiaddr {
if s.bestDest == nil {
return addrs
}
return s.bestDest.BestDestSelect(id, addrs)
}

// Connectedness returns our "connectedness" state with the given peer.
//
// To check if we have an open connection, use `s.Connectedness(p) ==
Expand Down
4 changes: 4 additions & 0 deletions swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (c *Conn) Close() error {
return c.err
}

func (c *Conn) IsClosed() bool {
return c.conn.IsClosed()
}

func (c *Conn) doClose() {
c.swarm.removeConn(c)

Expand Down
15 changes: 12 additions & 3 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()

// check if we already have an open connection first
conn := s.bestConnToPeer(p)
conn := s.bestConnToPeerWrapper(p)
if conn != nil {
return conn, nil
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
// Short circuit.
// By the time we take the dial lock, we may already *have* a connection
// to the peer.
c := s.bestConnToPeer(p)
c := s.bestConnToPeerWrapper(p)
if c != nil {
return c, nil
}
Expand All @@ -239,7 +239,7 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {

conn, err := s.dial(ctx, p)
if err != nil {
conn = s.bestConnToPeer(p)
conn = s.bestConnToPeerFallbackWrapper(p)
if conn != nil {
// Hm? What error?
// Could have canceled the dial because we received a
Expand Down Expand Up @@ -294,9 +294,18 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, errors.New("no addresses")
}
goodAddrs := s.filterKnownUndialables(peerAddrs)

if len(goodAddrs) == 0 {
return nil, errors.New("no good addresses")
}

if s.bestDest != nil {
// Select the best address to peer.
bestAddrs := s.bestDestSelectWrapper(p, goodAddrs)
if len(bestAddrs) != 0 {
goodAddrs = bestAddrs
}
}
goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
for _, a := range goodAddrs {
goodAddrsChan <- a
Expand Down