-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a single index Results when querying across blocks #1474
Changes from 1 commit
274bae1
f05ce65
760b41f
57fe7ae
c94cb18
6f434a2
c04b5e0
fa8331b
2f5db2d
361747f
fe32fc2
534de74
98205b5
28e78d1
672886a
ff46b45
a8f8a2d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -895,24 +895,16 @@ func (i *nsIndex) Query( | |
exhaustive bool | ||
returned bool | ||
}{ | ||
merged: nil, | ||
merged: i.resultsPool.Get(), | ||
exhaustive: true, | ||
returned: false, | ||
} | ||
) | ||
defer func() { | ||
// Ensure that during early error returns we let any aborted | ||
// goroutines know not to try to modify/edit the result any longer. | ||
results.Lock() | ||
results.returned = true | ||
results.Unlock() | ||
}() | ||
// Set the results namespace ID | ||
results.merged.Reset(i.nsMetadata.ID()) | ||
|
||
execBlockQuery := func(block index.Block) { | ||
blockResults := i.resultsPool.Get() | ||
blockResults.Reset(i.nsMetadata.ID()) | ||
|
||
blockExhaustive, err := block.Query(query, opts, blockResults) | ||
blockExhaustive, err := block.Query(query, opts, results.merged) | ||
if err == index.ErrUnableToQueryBlockClosed { | ||
// NB(r): Because we query this block outside of the results lock, it's | ||
// possible this block may get closed if it slides out of retention, in | ||
|
@@ -921,53 +913,14 @@ func (i *nsIndex) Query( | |
err = nil | ||
} | ||
|
||
var mergedResult bool | ||
results.Lock() | ||
defer func() { | ||
results.Unlock() | ||
if mergedResult { | ||
// Only finalize this result if we merged it into another. | ||
blockResults.Finalize() | ||
} | ||
}() | ||
|
||
if results.returned { | ||
// If already returned then we early cancelled, don't add any | ||
// further results or errors since caller already has a result. | ||
return | ||
} | ||
defer results.Unlock() | ||
|
||
if err != nil { | ||
results.multiErr = results.multiErr.Add(err) | ||
return | ||
} | ||
|
||
if results.merged == nil { | ||
// Return results to pool at end of request. | ||
ctx.RegisterFinalizer(blockResults) | ||
// No merged results yet, use this as the first to merge into. | ||
results.merged = blockResults | ||
} else { | ||
// Append the block results. | ||
mergedResult = true | ||
size := results.merged.Size() | ||
for _, entry := range blockResults.Map().Iter() { | ||
// Break early if reached limit. | ||
if opts.Limit > 0 && size >= opts.Limit { | ||
blockExhaustive = false | ||
break | ||
} | ||
|
||
// Append to merged results. | ||
id, tags := entry.Key(), entry.Value() | ||
_, size, err = results.merged.AddIDAndTags(id, tags) | ||
if err != nil { | ||
results.multiErr = results.multiErr.Add(err) | ||
return | ||
} | ||
} | ||
} | ||
|
||
// If block had more data but we stopped early, need to notify caller. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. super nit: This would read a lot better if you move this comment below the early return cause right now it seems like a bug at first glance cause you say need to notify the caller but then the code immediately returns without doing anything There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, will do. |
||
if blockExhaustive { | ||
return | ||
|
@@ -1064,8 +1017,6 @@ func (i *nsIndex) Query( | |
} | ||
|
||
results.Lock() | ||
// Signal not to add any further results since we've returned already. | ||
results.returned = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we losing the ability to do this signaling? Seems like this might make the impact of timed out queries even worse since they'll keep updating this map...Actually isn't it broken because they'll keep trying to add results to a map that may have been returned to the pool? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, where do the results get returned to the pool? I don't see it in this method or in the RPC method. Regardless, if we want to pool this thing you may need to add ref-counting or some type of unique query identifier or something There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I now use a lifetime to protect against writing to the results once we return from the Query call to the index, this prevents writing to results during cancellation or any other early return code path. |
||
// Take reference to vars to return while locked, need to allow defer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont understand this comment. What deadlock are you protecting against? The only Lock/Defer that is see is in the execBlockQuery func and I don't see how that would deadlock with this code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll remove that statement, there used to be other stuff going on. |
||
// lock/unlock cleanup to not deadlock with this locked code block. | ||
exhaustive := results.exhaustive | ||
|
@@ -1077,12 +1028,6 @@ func (i *nsIndex) Query( | |
return index.QueryResults{}, err | ||
} | ||
|
||
// If no blocks queried, return an empty result | ||
if mergedResults == nil { | ||
mergedResults = i.resultsPool.Get() | ||
mergedResults.Reset(i.nsMetadata.ID()) | ||
} | ||
|
||
return index.QueryResults{ | ||
Exhaustive: exhaustive, | ||
Results: mergedResults, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to call this merged now that we push it all the way down? Maybe just call it results
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, renamed.