From 4d0aa5eb94a5fbef35abc7459cc0ec2f2df327a7 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 24 Apr 2019 08:29:00 -0700 Subject: [PATCH] query: fix error "leak" Long-running queries can build up large error sets that we never actually use. This is exacerbated by https://github.com/libp2p/go-libp2p-swarm/pull/115. fixes https://github.com/libp2p/go-libp2p-swarm/issues/119 --- query.go | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/query.go b/query.go index 9e9c35968..e7a8c1cbc 100644 --- a/query.go +++ b/query.go @@ -2,9 +2,9 @@ package dht import ( "context" + "errors" "sync" - u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log" todoctr "github.com/ipfs/go-todocounter" process "github.com/jbenet/goprocess" @@ -18,6 +18,9 @@ import ( notif "github.com/libp2p/go-libp2p-routing/notifications" ) +// ErrNoPeersQueried is returned when we failed to connect to any peers. +var ErrNoPeersQueried = errors.New("failed to query any peers") + var maxQueryConcurrency = AlphaValue type dhtQuery struct { @@ -77,7 +80,6 @@ type dhtQueryRunner struct { peersRemaining todoctr.Counter // peersToQuery + currently processing result *dhtQueryResult // query result - errs u.MultiErr // result errors. maybe should be a map[peer.ID]error rateLimit chan struct{} // processing semaphore log logging.EventLogger @@ -155,23 +157,19 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes select { case <-r.peersRemaining.Done(): r.proc.Close() - r.RLock() - defer r.RUnlock() - - err = routing.ErrNotFound - - // if every query to every peer failed, something must be very wrong. - if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() { - logger.Debugf("query errs: %s", r.errs) - err = r.errs[0] + if r.peersQueried.Size() == 0 { + err = ErrNoPeersQueried + } else { + err = routing.ErrNotFound } case <-r.proc.Closed(): - r.RLock() - defer r.RUnlock() err = r.runCtx.Err() } + r.RLock() + defer r.RUnlock() + if r.result != nil && r.result.success { return r.result, nil } @@ -257,10 +255,6 @@ func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error { ID: p, }) - r.Lock() - r.errs = append(r.errs, err) - r.Unlock() - // This peer is dropping out of the race. r.peersRemaining.Decrement(1) return err @@ -289,10 +283,6 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { if err != nil { logger.Debugf("ERROR worker for: %v %v", p, err) - r.Lock() - r.errs = append(r.errs, err) - r.Unlock() - } else if res.success { logger.Debugf("SUCCESS worker for: %v %s", p, res) r.Lock()