Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Oct 12, 2020
1 parent ff267da commit 8b54cba
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
5 changes: 3 additions & 2 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func New(host host.Host, opts ...Option) (*Crawler, error) {
}
}

pm, err := wire.NewProtocolMessenger(&messageSender{h: host})
pm, err := wire.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -123,14 +123,15 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
wg.Add(c.parallelism)
for i := 0; i < c.parallelism; i++ {
go func() {
defer wg.Done()
for p := range jobs {
res := queryPeer(ctx, c.host, c.dhtRPC, p)
results <- res
}
}()
}

defer wg.Done()
defer wg.Wait()
defer close(jobs)

toDial := make([]*peer.AddrInfo, 0, 1000)
Expand Down
12 changes: 11 additions & 1 deletion crawler/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type options struct {
protocols []protocol.ID
parallelism int
crawlInterval time.Duration
perMsgTimeout time.Duration
}

// defaults are the default crawler options. This option will be automatically
Expand All @@ -20,14 +21,15 @@ var defaults = func(o *options) error {
o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"}
o.parallelism = 1000
o.crawlInterval = time.Hour
o.perMsgTimeout = time.Second * 5

return nil
}

// WithProtocols defines the ordered set of protocols the crawler will use to talk to other nodes
func WithProtocols(protocols []protocol.ID) Option {
return func(o *options) error {
o.protocols = protocols
o.protocols = append([]protocol.ID{}, protocols...)
return nil
}
}
Expand All @@ -39,3 +41,11 @@ func WithParallelism(parallelism int) Option {
return nil
}
}

// WithMsgTimeout defines the amount of time a single DHT message is allowed to take before it's deemed failed
func WithMsgTimeout(timeout time.Duration) Option {
return func(o *options) error {
o.perMsgTimeout = timeout
return nil
}
}

0 comments on commit 8b54cba

Please sign in to comment.