From 043e62ab6338a3672e50773c1072c951719f1aab Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 8 Sep 2020 08:59:05 +0200 Subject: [PATCH 1/6] Don't use latency as initital estimate for blocksync Signed-off-by: Jakub Sztandera --- chain/blocksync/peer_tracker.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/chain/blocksync/peer_tracker.go b/chain/blocksync/peer_tracker.go index f1f6ede07ac..b6ff932c298 100644 --- a/chain/blocksync/peer_tracker.go +++ b/chain/blocksync/peer_tracker.go @@ -72,16 +72,7 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID { var costI, costJ float64 getPeerInitLat := func(p peer.ID) float64 { - var res float64 - if bpt.pmgr != nil { - if lat, ok := bpt.pmgr.GetPeerLatency(p); ok { - res = float64(lat) - } - } - if res == 0 { - res = float64(bpt.avgGlobalTime) - } - return res * newPeerMul + return float64(bpt.avgGlobalTime) * newPeerMul } if pi.successes+pi.failures > 0 { From 93176c91f4c20c8f11fdfeae5ee0c0807f60c13e Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 8 Sep 2020 09:06:31 +0200 Subject: [PATCH 2/6] Track time in relation to request size Signed-off-by: Jakub Sztandera --- chain/blocksync/client.go | 6 +++--- chain/blocksync/peer_tracker.go | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go index 38e1f6d2c2d..5c7fe425191 100644 --- a/chain/blocksync/client.go +++ b/chain/blocksync/client.go @@ -385,7 +385,7 @@ func (client *BlockSync) sendRequestToPeer( _ = stream.SetWriteDeadline(time.Now().Add(WRITE_REQ_DEADLINE)) if err := cborutil.WriteCborRPC(stream, req); err != nil { _ = stream.SetWriteDeadline(time.Time{}) - client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) + client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length) // FIXME: Should we also remove peer here? return nil, err } @@ -398,7 +398,7 @@ func (client *BlockSync) sendRequestToPeer( bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)), &res) if err != nil { - client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) + client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length) return nil, xerrors.Errorf("failed to read blocksync response: %w", err) } @@ -412,7 +412,7 @@ func (client *BlockSync) sendRequestToPeer( ) } - client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart)) + client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart), uint64(len(res.Chain))) // FIXME: We should really log a success only after we validate the response. // It might be a bit hard to do. return &res, nil diff --git a/chain/blocksync/peer_tracker.go b/chain/blocksync/peer_tracker.go index b6ff932c298..5ef8f9c6fe7 100644 --- a/chain/blocksync/peer_tracker.go +++ b/chain/blocksync/peer_tracker.go @@ -98,8 +98,8 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID { const ( // xInvAlpha = (N+1)/2 - localInvAlpha = 5 // 86% of the value is the last 9 - globalInvAlpha = 20 // 86% of the value is the last 39 + localInvAlpha = 10 // 86% of the value is the last 19 + globalInvAlpha = 25 // 86% of the value is the last 49 ) func (bpt *bsPeerTracker) logGlobalSuccess(dur time.Duration) { @@ -124,7 +124,7 @@ func logTime(pi *peerStats, dur time.Duration) { } -func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) { +func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration, reqSize uint64) { bpt.lk.Lock() defer bpt.lk.Unlock() @@ -136,10 +136,10 @@ func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) { } pi.successes++ - logTime(pi, dur) + logTime(pi, dur/time.Duration(reqSize)) } -func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) { +func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration, reqSize uint64) { bpt.lk.Lock() defer bpt.lk.Unlock() @@ -151,7 +151,7 @@ func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) { } pi.failures++ - logTime(pi, dur) + logTime(pi, dur/time.Duration(reqSize)) } func (bpt *bsPeerTracker) removePeer(p peer.ID) { From 74e577610aa344484a476fd3ba31d5828e3b7ecb Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 8 Sep 2020 10:18:51 +0200 Subject: [PATCH 3/6] Forward peers from hello to blocksync Signed-off-by: Jakub Sztandera --- chain/blocksync/client.go | 5 ++++- chain/blocksync/peer_tracker.go | 29 +++++++++++++++++++++++++++-- lib/peermgr/peermgr.go | 25 +++++++++++++++++++++---- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/chain/blocksync/client.go b/chain/blocksync/client.go index 5c7fe425191..893759f6a0f 100644 --- a/chain/blocksync/client.go +++ b/chain/blocksync/client.go @@ -11,6 +11,7 @@ import ( inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "go.opencensus.io/trace" + "go.uber.org/fx" "golang.org/x/xerrors" cborutil "github.com/filecoin-project/go-cbor-util" @@ -36,12 +37,13 @@ type BlockSync struct { } func NewClient( + lc fx.Lifecycle, host host.Host, pmgr peermgr.MaybePeerMgr, ) *BlockSync { return &BlockSync{ host: host, - peerTracker: newPeerTracker(pmgr.Mgr), + peerTracker: newPeerTracker(lc, host, pmgr.Mgr), } } @@ -360,6 +362,7 @@ func (client *BlockSync) sendRequestToPeer( supported, err := client.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID) if err != nil { + client.RemovePeer(peer) return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) } if len(supported) == 0 || supported[0] != BlockSyncProtocolID { diff --git a/chain/blocksync/peer_tracker.go b/chain/blocksync/peer_tracker.go index 5ef8f9c6fe7..3a9d9089b47 100644 --- a/chain/blocksync/peer_tracker.go +++ b/chain/blocksync/peer_tracker.go @@ -3,11 +3,14 @@ package blocksync // FIXME: This needs to be reviewed. import ( + "context" "sort" "sync" "time" + host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "go.uber.org/fx" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/peermgr" @@ -29,11 +32,33 @@ type bsPeerTracker struct { pmgr *peermgr.PeerMgr } -func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker { - return &bsPeerTracker{ +func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeerTracker { + bsPt := &bsPeerTracker{ peers: make(map[peer.ID]*peerStats), pmgr: pmgr, } + + sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer)) + if err != nil { + panic(err) + } + go func() { + var newPeer interface{} + ok := true + for ok { + newPeer, ok = <-sub.Out() + log.Warnf("new peer from hello in tracker: %s", newPeer.(peermgr.NewFilPeer).Id) + bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id) + } + }() + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return sub.Close() + }, + }) + + return bsPt } func (bpt *bsPeerTracker) addPeer(p peer.ID) { diff --git a/lib/peermgr/peermgr.go b/lib/peermgr/peermgr.go index 80b05e8ce0a..a80e516c0f9 100644 --- a/lib/peermgr/peermgr.go +++ b/lib/peermgr/peermgr.go @@ -10,7 +10,10 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "go.opencensus.io/stats" "go.uber.org/fx" + "go.uber.org/multierr" + "golang.org/x/xerrors" + "github.com/libp2p/go-libp2p-core/event" host "github.com/libp2p/go-libp2p-core/host" net "github.com/libp2p/go-libp2p-core/network" peer "github.com/libp2p/go-libp2p-core/peer" @@ -50,12 +53,17 @@ type PeerMgr struct { h host.Host dht *dht.IpfsDHT - notifee *net.NotifyBundle + notifee *net.NotifyBundle + filPeerEmitter event.Emitter done chan struct{} } -func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) *PeerMgr { +type NewFilPeer struct { + Id peer.ID +} + +func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) { pm := &PeerMgr{ h: h, dht: dht, @@ -69,10 +77,18 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes done: make(chan struct{}), } + emitter, err := h.EventBus().Emitter(new(NewFilPeer)) + if err != nil { + return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err) + } + pm.filPeerEmitter = emitter lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { - return pm.Stop(ctx) + return multierr.Combine( + pm.filPeerEmitter.Close(), + pm.Stop(ctx), + ) }, }) @@ -84,10 +100,11 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes h.Network().Notify(pm.notifee) - return pm + return pm, nil } func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { + pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() pmgr.peers[p] = time.Duration(0) From ba9678bd610ade808547497e3efff6ce7487370c Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 8 Sep 2020 12:11:30 +0200 Subject: [PATCH 4/6] Use for to iterate over channel Signed-off-by: Jakub Sztandera --- chain/blocksync/peer_tracker.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/chain/blocksync/peer_tracker.go b/chain/blocksync/peer_tracker.go index 3a9d9089b47..7c30b92a275 100644 --- a/chain/blocksync/peer_tracker.go +++ b/chain/blocksync/peer_tracker.go @@ -42,12 +42,9 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer if err != nil { panic(err) } + go func() { - var newPeer interface{} - ok := true - for ok { - newPeer, ok = <-sub.Out() - log.Warnf("new peer from hello in tracker: %s", newPeer.(peermgr.NewFilPeer).Id) + for newPeer := range sub.Out() { bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id) } }() From 4fce0181ab303043d7c660f33b2263b48adb922e Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 8 Sep 2020 12:18:48 +0200 Subject: [PATCH 5/6] Ignore the linter Signed-off-by: Jakub Sztandera --- lib/peermgr/peermgr.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/peermgr/peermgr.go b/lib/peermgr/peermgr.go index a80e516c0f9..2f9d3467499 100644 --- a/lib/peermgr/peermgr.go +++ b/lib/peermgr/peermgr.go @@ -104,7 +104,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes } func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { - pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) + _ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck pmgr.peersLk.Lock() defer pmgr.peersLk.Unlock() pmgr.peers[p] = time.Duration(0) From b66058e417b4ff391b9fd6def9f4daa0517e216f Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 8 Sep 2020 12:19:37 +0200 Subject: [PATCH 6/6] Add 0 guard Signed-off-by: Jakub Sztandera --- chain/blocksync/peer_tracker.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/chain/blocksync/peer_tracker.go b/chain/blocksync/peer_tracker.go index 7c30b92a275..bb350aa5115 100644 --- a/chain/blocksync/peer_tracker.go +++ b/chain/blocksync/peer_tracker.go @@ -158,6 +158,9 @@ func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration, reqSize uint6 } pi.successes++ + if reqSize == 0 { + reqSize = 1 + } logTime(pi, dur/time.Duration(reqSize)) } @@ -173,6 +176,9 @@ func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration, reqSize uint6 } pi.failures++ + if reqSize == 0 { + reqSize = 1 + } logTime(pi, dur/time.Duration(reqSize)) }