Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a single index Results when querying across blocks #1474

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dbnode/integration/fetch_tagged_quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func makeTestFetchTagged(
index.QueryOptions{
StartInclusive: startTime.Add(-time.Minute),
EndExclusive: startTime.Add(time.Minute),
Limit: 1,
Limit: 100,
})
}

Expand Down
16 changes: 8 additions & 8 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestServiceQuery(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

resMap := index.NewResults(testIndexOptions)
resMap.Reset(ident.StringID(nsID))
resMap := index.NewResults(ident.StringID(nsID),
index.ResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTags(
ident.StringTag(tags["foo"][0].name, tags["foo"][0].value),
ident.StringTag(tags["foo"][1].name, tags["foo"][1].value),
Expand Down Expand Up @@ -1064,8 +1064,8 @@ func TestServiceFetchTagged(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

resMap := index.NewResults(testIndexOptions)
resMap.Reset(ident.StringID(nsID))
resMap := index.NewResults(ident.StringID(nsID),
index.ResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
Expand Down Expand Up @@ -1161,8 +1161,8 @@ func TestServiceFetchTaggedIsOverloaded(t *testing.T) {
req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*"))
require.NoError(t, err)

resMap := index.NewResults(testIndexOptions)
resMap.Reset(ident.StringID(nsID))
resMap := index.NewResults(ident.StringID(nsID),
index.ResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
Expand Down Expand Up @@ -1214,8 +1214,8 @@ func TestServiceFetchTaggedNoData(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

resMap := index.NewResults(testIndexOptions)
resMap.Reset(ident.StringID(nsID))
resMap := index.NewResults(ident.StringID(nsID),
index.ResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.Tags{})
resMap.Map().Set(ident.StringID("bar"), ident.Tags{})
mockDB.EXPECT().QueryIDs(
Expand Down
17 changes: 10 additions & 7 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,12 +1199,11 @@ func withEncodingAndPoolingOptions(
SetBytesPool(bytesPool).
SetIdentifierPool(identifierPool))

var (
resultsPool = index.NewResultsPool(
poolOptions(policy.IndexResultsPool, scope.SubScope("index-results-pool")))
postingsListOpts = poolOptions(policy.PostingsListPool, scope.SubScope("postingslist-pool"))
postingsList = postings.NewPool(postingsListOpts, roaring.NewPostingsList)
)
postingsListOpts := poolOptions(policy.PostingsListPool, scope.SubScope("postingslist-pool"))
postingsList := postings.NewPool(postingsListOpts, roaring.NewPostingsList)

resultsPool := index.NewResultsPool(
poolOptions(policy.IndexResultsPool, scope.SubScope("index-results-pool")))

indexOpts := opts.IndexOptions().
SetInstrumentOptions(iopts).
Expand All @@ -1223,7 +1222,11 @@ func withEncodingAndPoolingOptions(
SetCheckedBytesPool(bytesPool).
SetResultsPool(resultsPool)

resultsPool.Init(func() index.Results { return index.NewResults(indexOpts) })
resultsPool.Init(func() index.Results {
// NB(r): Need to initialize after setting the index opts so
// it seems the same reference of the options as is set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence reads like word salad lol. I think you were trying to say: "so it sees the same reference to the indexOptions"

return index.NewResults(nil, index.ResultsOptions{}, indexOpts)
})

return opts.SetIndexOptions(indexOpts)
}
Expand Down
111 changes: 29 additions & 82 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
m3ninxindex "github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/index/segment/builder"
"github.com/m3db/m3/src/x/resource"
xclose "github.com/m3db/m3x/close"
"github.com/m3db/m3x/context"
xerrors "github.com/m3db/m3x/errors"
Expand Down Expand Up @@ -887,32 +888,29 @@ func (i *nsIndex) Query(
deadline = start.Add(timeout)
wg sync.WaitGroup

// Results contains all concurrent mutable state below.
results = struct {
// State contains concurrent mutable state for async execution below.
state = struct {
sync.Mutex
multiErr xerrors.MultiError
merged index.Results
exhaustive bool
returned bool
}{
merged: nil,
exhaustive: true,
returned: false,
}
)
defer func() {
// Ensure that during early error returns we let any aborted
// goroutines know not to try to modify/edit the result any longer.
results.Lock()
results.returned = true
results.Unlock()
}()

execBlockQuery := func(block index.Block) {
blockResults := i.resultsPool.Get()
blockResults.Reset(i.nsMetadata.ID())
// Get results and set the namespace ID and size limit.
results := i.resultsPool.Get()
results.Reset(i.nsMetadata.ID(), index.ResultsOptions{
SizeLimit: opts.Limit,
})

// Create a cancellable lifetime and cancel it at end of this method so that
// no child async task modifies the result after this method returns.
cancellable := resource.NewCancellableLifetime()
defer cancellable.Cancel()

blockExhaustive, err := block.Query(query, opts, blockResults)
execBlockQuery := func(block index.Block) {
blockExhaustive, err := block.Query(cancellable, query, opts, results)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
Expand All @@ -921,75 +919,33 @@ func (i *nsIndex) Query(
err = nil
}

var mergedResult bool
results.Lock()
defer func() {
results.Unlock()
if mergedResult {
// Only finalize this result if we merged it into another.
blockResults.Finalize()
}
}()

if results.returned {
// If already returned then we early cancelled, don't add any
// further results or errors since caller already has a result.
return
}
state.Lock()
defer state.Unlock()

if err != nil {
results.multiErr = results.multiErr.Add(err)
state.multiErr = state.multiErr.Add(err)
return
}

if results.merged == nil {
// Return results to pool at end of request.
ctx.RegisterFinalizer(blockResults)
// No merged results yet, use this as the first to merge into.
results.merged = blockResults
} else {
// Append the block results.
mergedResult = true
size := results.merged.Size()
for _, entry := range blockResults.Map().Iter() {
// Break early if reached limit.
if opts.Limit > 0 && size >= opts.Limit {
blockExhaustive = false
break
}

// Append to merged results.
id, tags := entry.Key(), entry.Value()
_, size, err = results.merged.AddIDAndTags(id, tags)
if err != nil {
results.multiErr = results.multiErr.Add(err)
return
}
}
}

// If block had more data but we stopped early, need to notify caller.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: This would read a lot better if you move this comment below the early return cause right now it seems like a bug at first glance cause you say need to notify the caller but then the code immediately returns without doing anything

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will do.

if blockExhaustive {
return
}
results.exhaustive = false
state.exhaustive = false
}

for _, block := range blocks {
// Capture block for async query execution below.
block := block

// Terminate early if we know we don't need any more results.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a second to understand this, could you clarify the comment to something like:

"We're looping through all the blocks that we need to query and kicking off parallel queries which are bounded by the queryWorkersPool's maximum concurrency. This means that it's possible at this point that we've completed querying one or more blocks and already exhausted the maximum number of results that we're allowed to return. If thats the case, there is no value in kicking off more parallel queries, so we break out of the loop."""

I know its a little verbose but I've seen multiple iterations of this code already and other people coming in with fresh eyes will probably have more trouble following.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for writing the comment, will do - hah 👍

results.Lock()
mergedSize := 0
if results.merged != nil {
mergedSize = results.merged.Size()
}
alreadyNotExhaustive := opts.Limit > 0 && mergedSize >= opts.Limit
size := results.Size()
alreadyNotExhaustive := opts.LimitExceeded(size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this be renamed to alreadyExceededLimit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

if alreadyNotExhaustive {
results.exhaustive = false
state.Lock()
state.exhaustive = false
state.Unlock()
}
results.Unlock()

if alreadyNotExhaustive {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little weird, can you just move the break into the previous conditional block that has the same condition?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, this is a side effect of the refactor actually, good catch.

// Break out if already exhaustive.
Expand Down Expand Up @@ -1063,29 +1019,20 @@ func (i *nsIndex) Query(
}
}

results.Lock()
// Signal not to add any further results since we've returned already.
results.returned = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we losing the ability to do this signaling? Seems like this might make the impact of timed out queries even worse since they'll keep updating this map...Actually isn't it broken because they'll keep trying to add results to a map that may have been returned to the pool?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, where do the results get returned to the pool? I don't see it in this method or in the RPC method.

Regardless, if we want to pool this thing you may need to add ref-counting or some type of unique query identifier or something

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I now use a lifetime to protect against writing to the results once we return from the Query call to the index, this prevents writing to results during cancellation or any other early return code path.

state.Lock()
// Take reference to vars to return while locked, need to allow defer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont understand this comment. What deadlock are you protecting against? The only Lock/Defer that is see is in the execBlockQuery func and I don't see how that would deadlock with this code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove that statement, there used to be other stuff going on.

// lock/unlock cleanup to not deadlock with this locked code block.
exhaustive := results.exhaustive
mergedResults := results.merged
err = results.multiErr.FinalError()
results.Unlock()
exhaustive := state.exhaustive
err = state.multiErr.FinalError()
state.Unlock()

if err != nil {
return index.QueryResults{}, err
}

// If no blocks queried, return an empty result
if mergedResults == nil {
mergedResults = i.resultsPool.Get()
mergedResults.Reset(i.nsMetadata.ID())
}

return index.QueryResults{
Exhaustive: exhaustive,
Results: mergedResults,
Results: results,
}, nil
}

Expand Down
60 changes: 55 additions & 5 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/m3db/m3/src/m3ninx/index/segment/fst"
"github.com/m3db/m3/src/m3ninx/search"
"github.com/m3db/m3/src/m3ninx/search/executor"
"github.com/m3db/m3/src/x/resource"
"github.com/m3db/m3x/context"
xerrors "github.com/m3db/m3x/errors"
"github.com/m3db/m3x/instrument"
Expand All @@ -61,6 +62,7 @@ var (
errForegroundCompactorNoPlan = errors.New("index foreground compactor failed to generate a plan")
errForegroundCompactorBadPlanFirstTask = errors.New("index foreground compactor generated plan without mutable segment in first task")
errForegroundCompactorBadPlanSecondaryTask = errors.New("index foreground compactor generated plan with mutable segment a secondary task")
errCancelledQuery = errors.New("query was cancelled")

errUnableToSealBlockIllegalStateFmtString = "unable to seal, index block state: %v"
errUnableToWriteBlockUnknownStateFmtString = "unable to write, unknown index block state: %v"
Expand All @@ -73,6 +75,8 @@ const (
blockStateSealed
blockStateClosed

defaultQueryDocsBatchSize = 256

compactDebugLogEvery = 1 // Emit debug log for every compaction
)

Expand Down Expand Up @@ -749,6 +753,7 @@ func (b *block) executorWithRLock() (search.Executor, error) {
}

func (b *block) Query(
cancellable *resource.CancellableLifetime,
query Query,
opts QueryOptions,
results Results,
Expand All @@ -772,23 +777,39 @@ func (b *block) Query(
}

size := results.Size()
limitedResults := false
batch := b.docsPool.Get()
batchSize := cap(batch)
if batchSize == 0 {
batchSize = defaultQueryDocsBatchSize
}
iterCloser := safeCloser{closable: iter}
execCloser := safeCloser{closable: exec}

defer func() {
b.docsPool.Put(batch)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to reset the batch here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No thankfully the Put method on the array will zero out each elem and also resize it to zero.

iterCloser.Close()
execCloser.Close()
}()

for iter.Next() {
if opts.LimitExceeded(size) {
limitedResults = true
break
}

d := iter.Current()
_, size, err = results.AddDocument(d)
batch = append(batch, iter.Current())
if len(batch) < batchSize {
continue
}

batch, size, err = b.addQueryResults(cancellable, results, batch)
if err != nil {
return false, err
}
}

// Add last batch to results if remaining.
if len(batch) > 0 {
batch, size, err = b.addQueryResults(cancellable, results, batch)
if err != nil {
return false, err
}
Expand All @@ -806,10 +827,39 @@ func (b *block) Query(
return false, err
}

exhaustive := !limitedResults
exhaustive := !opts.LimitExceeded(size)
return exhaustive, nil
}

func (b *block) addQueryResults(
cancellable *resource.CancellableLifetime,
results Results,
batch []doc.Document,
) ([]doc.Document, int, error) {
// Checkout the lifetime of the query before adding results
queryValid := cancellable.TryCheckout()
if !queryValid {
// Query not valid any longer, do not add results and return early
return batch, 0, errCancelledQuery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm this error will still get propagated up to the level above and stored in the state multierr.....I guess thats fine though since you take a copy of the multierr when you return

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we always lock when we access multierr.

}

// Try to add the docs to the resource
size, err := results.AddDocuments(batch)

// Immediately release the checkout on the lifetime of query
cancellable.ReleaseCheckout()

// Reset batch
var emptyDoc doc.Document
for i := range batch {
batch[i] = emptyDoc
}
batch = batch[:0]

// Return results
return batch, size, err
}

func (b *block) AddResults(
results result.IndexBlock,
) error {
Expand Down
Loading