-
-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rewrite FindProvidersAsync #440
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -138,43 +138,78 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { | |
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer { | ||
log.Event(ctx, "findProviders", &key) | ||
peerOut := make(chan peer.Peer, count) | ||
go func() { | ||
defer close(peerOut) | ||
|
||
ps := newPeerSet() | ||
// TODO may want to make this function async to hide latency | ||
provs := dht.providers.GetProviders(ctx, key) | ||
for _, p := range provs { | ||
count-- | ||
// NOTE: assuming that this list of peers is unique | ||
ps.Add(p) | ||
go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut) | ||
return peerOut | ||
} | ||
|
||
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) { | ||
defer close(peerOut) | ||
|
||
ps := newPeerSet() | ||
provs := dht.providers.GetProviders(ctx, key) | ||
for _, p := range provs { | ||
// NOTE: assuming that this list of peers is unique | ||
if ps.AddIfSmallerThan(p, count) { | ||
select { | ||
case peerOut <- p: | ||
case <-ctx.Done(): | ||
return | ||
} | ||
if count <= 0 { | ||
return | ||
} | ||
|
||
// If we have enough peers locally, dont bother with remote RPC | ||
if ps.Size() >= count { | ||
return | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. afaict, this is an optimization in case we have enough locally, right? maybe comment it so? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thats exactly what it is, since the call asks for a certain number of providers, we dont ever want to send more than they expect. |
||
|
||
// setup the Query | ||
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { | ||
|
||
pmes, err := dht.findProvidersSingle(ctx, p, key, 0) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
provs, errs := pb.PBPeersToPeers(dht.peerstore, pmes.GetProviderPeers()) | ||
for _, err := range errs { | ||
if err != nil { | ||
log.Warning(err) | ||
} | ||
} | ||
|
||
var wg sync.WaitGroup | ||
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) | ||
for _, pp := range peers { | ||
wg.Add(1) | ||
go func(p peer.Peer) { | ||
defer wg.Done() | ||
pmes, err := dht.findProvidersSingle(ctx, p, key, 0) | ||
if err != nil { | ||
log.Error(err) | ||
return | ||
// Add unique providers from request, up to 'count' | ||
for _, prov := range provs { | ||
if ps.AddIfSmallerThan(prov, count) { | ||
select { | ||
case peerOut <- prov: | ||
case <-ctx.Done(): | ||
log.Error("Context timed out sending more providers") | ||
return nil, ctx.Err() | ||
} | ||
dht.addPeerListAsync(ctx, key, pmes.GetProviderPeers(), ps, count, peerOut) | ||
}(pp) | ||
} | ||
if ps.Size() >= count { | ||
return &dhtQueryResult{success: true}, nil | ||
} | ||
} | ||
wg.Wait() | ||
}() | ||
return peerOut | ||
|
||
// Give closer peers back to the query to be queried | ||
closer := pmes.GetCloserPeers() | ||
clpeers, errs := pb.PBPeersToPeers(dht.peerstore, closer) | ||
for _, err := range errs { | ||
if err != nil { | ||
log.Warning(err) | ||
} | ||
} | ||
|
||
return &dhtQueryResult{closerPeers: clpeers}, nil | ||
}) | ||
|
||
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, i can switch over to a single routing table in this PR if youre okay with that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes pls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, just merge this and another PR. small PRs = easy to CR |
||
_, err := query.Run(ctx, peers) | ||
if err != nil { | ||
log.Errorf("FindProviders Query error: %s", err) | ||
} | ||
} | ||
|
||
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if this assumption isn't true? could it make it hang?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, it wont hang, we just might send a request to the same peer twice. But im very certain that GetProviders from our provider manager returns a unique list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
easy to do the
ps.Contains
check here too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that it is... i think not doing that is a result of my shitty-programmer-copy-paste-itis