Skip to content

Commit

Permalink
query: fix error "leak"
Browse files Browse the repository at this point in the history
Long-running queries can build up large error sets that we never actually use.
This is exacerbated by libp2p/go-libp2p-swarm#115.

fixes libp2p/go-libp2p-swarm#119
  • Loading branch information
Stebalien committed Apr 24, 2019
1 parent ca611b1 commit 3c9f5bc
Showing 1 changed file with 11 additions and 21 deletions.
32 changes: 11 additions & 21 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package dht

import (
"context"
"errors"
"sync"

u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
todoctr "github.com/ipfs/go-todocounter"
process "github.com/jbenet/goprocess"
Expand All @@ -18,6 +18,9 @@ import (
notif "github.com/libp2p/go-libp2p-routing/notifications"
)

// ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers")

var maxQueryConcurrency = AlphaValue

type dhtQuery struct {
Expand Down Expand Up @@ -77,7 +80,6 @@ type dhtQueryRunner struct {
peersRemaining todoctr.Counter // peersToQuery + currently processing

result *dhtQueryResult // query result
errs u.MultiErr // result errors. maybe should be a map[peer.ID]error

rateLimit chan struct{} // processing semaphore
log logging.EventLogger
Expand Down Expand Up @@ -155,23 +157,19 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
select {
case <-r.peersRemaining.Done():
r.proc.Close()
r.RLock()
defer r.RUnlock()

err = routing.ErrNotFound

// if every query to every peer failed, something must be very wrong.
if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
logger.Debugf("query errs: %s", r.errs)
err = r.errs[0]
if r.peersQueried.Size() == 0 {
err = ErrNoPeersQueried
} else {
err = routing.ErrNotFound
}

case <-r.proc.Closed():
r.RLock()
defer r.RUnlock()
err = r.runCtx.Err()
}

r.RLock()
defer r.RUnlock()

if r.result != nil && r.result.success {
return r.result, nil
}
Expand Down Expand Up @@ -257,10 +255,6 @@ func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
ID: p,
})

r.Lock()
r.errs = append(r.errs, err)
r.Unlock()

// This peer is dropping out of the race.
r.peersRemaining.Decrement(1)
return err
Expand Down Expand Up @@ -289,10 +283,6 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {

if err != nil {
logger.Debugf("ERROR worker for: %v %v", p, err)
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()

} else if res.success {
logger.Debugf("SUCCESS worker for: %v %s", p, res)
r.Lock()
Expand Down

0 comments on commit 3c9f5bc

Please sign in to comment.