-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for concurrent index block queries #1195
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1195 +/- ##
========================================
- Coverage 71.1% 71% -0.1%
========================================
Files 739 739
Lines 62065 62206 +141
========================================
+ Hits 44132 44197 +65
- Misses 15083 15143 +60
- Partials 2850 2866 +16
Continue to review full report at Codecov.
|
Ready to review now |
@@ -38,6 +38,10 @@ const ( | |||
// DefaultBootstrapConsistencyLevel is the default bootstrap consistency level | |||
DefaultBootstrapConsistencyLevel = topology.ReadConsistencyLevelMajority | |||
|
|||
// DefaultIndexDefaultQueryTimeout is the hard timeout value to use if none is | |||
// specified for a specific query, zero specifies no timeout. | |||
DefaultIndexDefaultQueryTimeout = time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What value are you seeing in prod? I would have thought 15->30 would be a more reasonable default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it could be, I just don't want to necessarily break some users existing queries. I could be convinced using 30s instead though for sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you file an issue for this to be settable from config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we don't have good support for setting runtime values in O.S.S
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I opened here:
#1220
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't finished reviewing everything yet but I need to run right now, will finish review later
return index.QueryResults{}, errDbIndexUnableToQueryClosed | ||
} | ||
|
||
// override query response limit if needed. | ||
// Track this as an inflight query that needs to finish |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit: period at the end of all these comments, I've been trying to fix these as we go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/dbnode/storage/index.go
Outdated
}, nil | ||
} | ||
|
||
func (i *nsIndex) timeoutForQueryWithLock( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withRLock
for consistency with our naming conventions elsewhere
src/dbnode/storage/index.go
Outdated
return i.state.runtimeOpts.defaultQueryTimeout | ||
} | ||
|
||
func (i *nsIndex) overriddenOptsForQueryWithLock( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withRLock
for consistency with our naming conventions elsewhere
src/dbnode/storage/index.go
Outdated
results.Reset(i.nsMetadata.ID()) | ||
ctx.RegisterFinalizer(results) | ||
|
||
func (i *nsIndex) blocksForQueryWithLock(queryRange xtime.Ranges) ([]index.Block, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withRLock for consistency with our naming conventions elsewhere
src/dbnode/storage/index.go
Outdated
} | ||
|
||
var ( | ||
start = i.nowFn() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't depend on the lock right? Can probably move this above the RLock and then set the deadline
down here still
} | ||
|
||
// ensure the block has data requested by the query | ||
if queryRange.IsEmpty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This might be a little less confusing if it was the first part of the loop. As is, I was confused why you would need to check this after doing the block lookup, but its not related to that at all
src/dbnode/storage/index.go
Outdated
} | ||
} | ||
func (i *nsIndex) newConcurrentResults(ctx context.Context) *index.ConcurrentResults { | ||
results := i.opts.IndexOptions().ResultsPool().Get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
want to just throw this pool onto the struct itself like we do with the worker pool so you don't have to do the triple function call?
src/dbnode/storage/index.go
Outdated
i.state.closed = true | ||
|
||
var multiErr xerrors.MultiError | ||
multiErr = multiErr.Add(i.state.insertQueue.Stop()) | ||
|
||
// Wait for inflight queries to finish before closing blocks | ||
i.queriesWg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be safer (or less likely to deadlock when changes are made) if you do this wait outside the lock. So:
i.state.Lock()
defer i.state.Unlock()
if !i.isOpenWithRLock() {
return errDbIndexAlreadyClosed
}
// Signal our intent to close so that we stop accepting reads / writes.
i.state.closed = true
i.state.Unlock()
// Wait for in-flight queries to finish before continuing.
i.queriesWg.Wait()
// Reacquire the lock so we can complete the shutdown.
// Not even sure you need to re-acquire the lock at this point if you're willing
// to remove the defer at the top since technically no one else can call Close()
// after you mark is closed.
i.state.Lock()
var multiErr xerror.MultiError
multiErr = multiErr.Add(i.state.insertQueue.Stop())
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You understand the lifecycle better than I do, so take it or leave it
size = results.Size() | ||
brokeEarly = false | ||
) | ||
size := results.Size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wanna add the var back? I think it was cleaner. I assume you removed it unintentionally while refactoring
src/dbnode/storage/index/block.go
Outdated
// we only retrieve the results lock when we add a batch of documents | ||
// to the results set. | ||
batch := b.docsPool.Get() | ||
// Use documentArrayPoolCapacity to as max batch to avoid growing outside |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Use the maximum capacity of the array pool as the max batch size to avoid growing outside the allowed pool capacity."
src/dbnode/storage/index/block.go
Outdated
}() | ||
|
||
// NB(r): This query method only called once per block so is relatively | ||
// cheap to declared as a lambda. | ||
flushBatch := func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you eliminate the allocation entirely if you had a named function / method with the signature:
func flushBatch(batch []Document, results *ConcurrentResults) ([]Document, error)
and then you could use it like:
batch = append(batch, d)
if len(batch) < maxBatch {
continue
}
batch, err = flushBatch(batch, results)
if err != nil {
...
Maybe overkill, but the anonymous func doesn't seem like its closing over many vars
brokeEarly = false | ||
) | ||
size := results.Size() | ||
limitedResults := false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you just call this exhaustive and return it at the end instead of doing the limitedResults -> exhaustive translation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reads worse, it was actually like this before.
// documentArrayPool size in general: 256*256*sizeof(doc.Document) | ||
// = 256 * 256 * 16 | ||
// = 1mb (but with Go's heap probably 2mb) | ||
// TODO(r): Make this configurable in a followup change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're probably gonna want this tunable really soon so we don't have to ask people to recompile when they're testing our changes, especially if their read workloads are very different from ours
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now only going to be used by the compactor, so it should be tiny (we never will have 256 concurrent compactions, and compactor just uses a single pool).
Name: r.copyBytes(f.Name), | ||
Value: r.copyBytes(f.Value), | ||
}) | ||
tags.Append(r.idPool.CloneTag(ident.Tag{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this just a cleanup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, this wasn't being pooled before, now it is (it always could have been).
src/dbnode/storage/index.go
Outdated
workers = i.opts.QueryIDsWorkerPool() | ||
wg sync.WaitGroup | ||
|
||
// results contains all concurrent mutalbe state below |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mutable
…ced timeout tests
@@ -38,6 +38,10 @@ const ( | |||
// DefaultBootstrapConsistencyLevel is the default bootstrap consistency level | |||
DefaultBootstrapConsistencyLevel = topology.ReadConsistencyLevelMajority | |||
|
|||
// DefaultIndexDefaultQueryTimeout is the hard timeout value to use if none is | |||
// specified for a specific query, zero specifies no timeout. | |||
DefaultIndexDefaultQueryTimeout = time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you file an issue for this to be settable from config?
@@ -38,6 +38,10 @@ const ( | |||
// DefaultBootstrapConsistencyLevel is the default bootstrap consistency level | |||
DefaultBootstrapConsistencyLevel = topology.ReadConsistencyLevelMajority | |||
|
|||
// DefaultIndexDefaultQueryTimeout is the hard timeout value to use if none is | |||
// specified for a specific query, zero specifies no timeout. | |||
DefaultIndexDefaultQueryTimeout = time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we don't have good support for setting runtime values in O.S.S
|
||
// queriesWg tracks outstanding queries to ensure | ||
// we wait for all queries to complete before actually closing | ||
// blocks and other cleanup tasks on index close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: Period.
i.queriesWg.Add(1) | ||
defer i.queriesWg.Done() | ||
|
||
// Enact overrides for query options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: period
} | ||
|
||
var ( | ||
ticker = time.NewTicker(timeLeft) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know Prateek has talked about time tickers not getting scheduled properly in certain situations and preferring to use loops with time.Sleep which apparently has some special hooks into the runtime, although then you'd have to implement polling so ehhhh. Probably fine as is
@@ -905,18 +1138,19 @@ func (i *nsIndex) CleanupExpiredFileSets(t time.Time) error { | |||
|
|||
func (i *nsIndex) Close() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there not a "Tick()" loop? How does the ticking get stopped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a tick loop yeah, see Tick()
on nsIndex
.
No description provided.