Skip to content

Commit

Permalink
[dbnode] Aggregate() using only FSTs where possible (#1545)
Browse files Browse the repository at this point in the history
  • Loading branch information
prateek authored Apr 23, 2019
1 parent d889d3e commit 0185c0e
Show file tree
Hide file tree
Showing 20 changed files with 2,694 additions and 305 deletions.
17 changes: 16 additions & 1 deletion src/dbnode/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ genny-map-storage-index-aggregation-results: genny-map-storage-index-aggregate-v

# generation rule for all generated arraypools
.PHONY: genny-arraypool-all
genny-arraypool-all: genny-arraypool-node-segments
genny-arraypool-all: \
genny-arraypool-node-segments \
genny-arraypool-aggregate-results-entry \

# arraypool generation rule for ./network/server/tchannelthrift/node/segmentsArrayPool
.PHONY: genny-arraypool-node-segments
Expand All @@ -186,6 +188,19 @@ genny-arraypool-node-segments:
rename_type_middle=Segments \
rename_constructor=newSegmentsArrayPool

# arraypool generation rule for ./storage/index/AggregateResultsEntryArrayPool
.PHONY: genny-arraypool-aggregate-results-entry
genny-arraypool-aggregate-results-entry:
cd $(m3x_package_path) && make genny-arraypool \
pkg=index \
elem_type=AggregateResultsEntry \
target_package=$(m3db_package)/src/dbnode/storage/index \
out_file=aggregate_results_entry_arraypool_gen.go \
rename_type_prefix=AggregateResultsEntry \
rename_type_middle=AggregateResultsEntry \
rename_constructor=NewAggregateResultsEntryArrayPool \
rename_gen_types=true \

# generation rule for all generated leakcheckpools
.PHONY: genny-leakcheckpool-all
genny-leakcheckpool-all: \
Expand Down
134 changes: 96 additions & 38 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/namespace"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
m3ninxindex "github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/index/segment/builder"
Expand Down Expand Up @@ -72,6 +73,10 @@ const (
nsIndexReportStatsInterval = 10 * time.Second
)

var (
allQuery = idx.NewAllQuery()
)

// nolint: maligned
type nsIndex struct {
state nsIndexState
Expand Down Expand Up @@ -167,6 +172,23 @@ type newNamespaceIndexOpts struct {
newBlockFn newBlockFn
}

// execBlockQueryFn executes a query against the given block whilst tracking state.
type execBlockQueryFn func(
cancellable *resource.CancellableLifetime,
block index.Block,
query index.Query,
opts index.QueryOptions,
state *asyncQueryExecState,
results index.BaseResults,
)

// asyncQueryExecState tracks the async execution errors and results for a query.
type asyncQueryExecState struct {
sync.Mutex
multiErr xerrors.MultiError
exhaustive bool
}

// newNamespaceIndex returns a new namespaceIndex for the provided namespace.
func newNamespaceIndex(
nsMD namespace.Metadata,
Expand Down Expand Up @@ -265,6 +287,7 @@ func newNamespaceIndexWithOptions(
queryWorkersPool: newIndexOpts.opts.QueryIDsWorkerPool(),
metrics: newNamespaceIndexMetrics(indexOpts, instrumentOpts),
}

if runtimeOptsMgr != nil {
idx.runtimeOptsListener = runtimeOptsMgr.RegisterListener(idx)
}
Expand Down Expand Up @@ -869,7 +892,7 @@ func (i *nsIndex) Query(
SizeLimit: opts.Limit,
})
ctx.RegisterFinalizer(results)
exhaustive, err := i.query(ctx, query, results, opts)
exhaustive, err := i.query(ctx, query, results, opts, i.execBlockQueryFn)
if err != nil {
return index.QueryResult{}, err
}
Expand All @@ -892,7 +915,12 @@ func (i *nsIndex) AggregateQuery(
Type: opts.Type,
})
ctx.RegisterFinalizer(results)
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions)
// use appropriate fn to query underlying blocks.
fn := i.execBlockQueryFn
if query.Equal(allQuery) {
fn = i.execBlockAggregateQueryFn
}
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn)
if err != nil {
return index.AggregateQueryResult{}, err
}
Expand All @@ -907,6 +935,7 @@ func (i *nsIndex) query(
query index.Query,
results index.BaseResults,
opts index.QueryOptions,
execBlockFn execBlockQueryFn,
) (bool, error) {
// Capture start before needing to acquire lock.
start := i.nowFn()
Expand Down Expand Up @@ -942,50 +971,19 @@ func (i *nsIndex) query(
}

var (
deadline = start.Add(timeout)
wg sync.WaitGroup

// State contains concurrent mutable state for async execution below.
state = struct {
sync.Mutex
multiErr xerrors.MultiError
exhaustive bool
}{
state = asyncQueryExecState{
exhaustive: true,
}
deadline = start.Add(timeout)
wg sync.WaitGroup
)

// Create a cancellable lifetime and cancel it at end of this method so that
// no child async task modifies the result after this method returns.
cancellable := resource.NewCancellableLifetime()
defer cancellable.Cancel()

execBlockQuery := func(block index.Block) {
blockExhaustive, err := block.Query(cancellable, query, opts, results)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
}

state.Lock()
defer state.Unlock()

if err != nil {
state.multiErr = state.multiErr.Add(err)
return
}

if blockExhaustive {
return
}

// If block had more data but we stopped early, need to notify caller.
state.exhaustive = false
}

for _, block := range blocks {
// Capture block for async query execution below.
block := block
Expand All @@ -1011,7 +1009,7 @@ func (i *nsIndex) query(
// No timeout, just wait blockingly for a worker.
wg.Add(1)
i.queryWorkersPool.Go(func() {
execBlockQuery(block)
execBlockFn(cancellable, block, query, opts, &state, results)
wg.Done()
})
continue
Expand All @@ -1022,7 +1020,7 @@ func (i *nsIndex) query(
if timeLeft := deadline.Sub(i.nowFn()); timeLeft > 0 {
wg.Add(1)
timedOut := !i.queryWorkersPool.GoWithTimeout(func() {
execBlockQuery(block)
execBlockFn(cancellable, block, query, opts, &state, results)
wg.Done()
}, timeLeft)

Expand Down Expand Up @@ -1087,6 +1085,66 @@ func (i *nsIndex) query(
return exhaustive, nil
}

func (i *nsIndex) execBlockQueryFn(
cancellable *resource.CancellableLifetime,
block index.Block,
query index.Query,
opts index.QueryOptions,
state *asyncQueryExecState,
results index.BaseResults,
) {
blockExhaustive, err := block.Query(cancellable, query, opts, results)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
}

state.Lock()
defer state.Unlock()

if err != nil {
state.multiErr = state.multiErr.Add(err)
}
state.exhaustive = state.exhaustive && blockExhaustive
}

func (i *nsIndex) execBlockAggregateQueryFn(
cancellable *resource.CancellableLifetime,
block index.Block,
query index.Query,
opts index.QueryOptions,
state *asyncQueryExecState,
results index.BaseResults,
) {
aggResults, ok := results.(index.AggregateResults)
if !ok { // should never happen
state.Lock()
state.multiErr = state.multiErr.Add(
fmt.Errorf("unknown results type [%T] received during aggregation", results))
state.Unlock()
return
}

blockExhaustive, err := block.Aggregate(cancellable, opts, aggResults)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
}

state.Lock()
defer state.Unlock()
if err != nil {
state.multiErr = state.multiErr.Add(err)
}
state.exhaustive = state.exhaustive && blockExhaustive
}

func (i *nsIndex) timeoutForQueryWithRLock(
ctx context.Context,
) time.Duration {
Expand Down
43 changes: 43 additions & 0 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,49 @@ func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, error) {
return size, err
}

func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions {
return r.aggregateOpts
}

func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) int {
r.Lock()
for _, entry := range batch {
f := entry.Field
aggValues, ok := r.resultsMap.Get(f)
if !ok {
aggValues = r.valuesPool.Get()
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
})
} else {
// because we already have a entry for this field, we release the ident back to
// the underlying pool.
f.Finalize()
}
valuesMap := aggValues.Map()
for _, t := range entry.Terms {
if !valuesMap.Contains(t) {
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
})
} else {
// because we already have a entry for this term, we release the ident back to
// the underlying pool.
t.Finalize()
}
}
}
size := r.resultsMap.Len()
r.Unlock()
return size
}

func (r *aggregatedResults) addDocumentsBatchWithLock(
batch []doc.Document,
) error {
Expand Down
Loading

0 comments on commit 0185c0e

Please sign in to comment.