Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[index] Aggregating results on storage side #1463

Merged
merged 23 commits into from
Mar 26, 2019
Merged

Conversation

arnikola
Copy link
Collaborator

@arnikola arnikola commented Mar 15, 2019

genny-map-storage-namespace-metadata \
genny-map-storage-repair \
genny-map-storage-index-results \
genny-map-storage-index-search-results \
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops obviously doesn't exist

# Map generation rule for storage/index/AggregationResultsMap
.PHONY: genny-map-storage-index-aggregation-results
genny-map-storage-index-aggregation-results: install-m3x-repo
cd $(m3x_package_path) && make hashmap-gen \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/hashmap-gen/idhashmap-gen

It’s the target for genny maps with ident.ID keys, so then you can get rid of the key_type specifier below.

@arnikola arnikola changed the title [wip] initial interfaces for aggregate results on storage side [index] Aggregating results on storage side Mar 18, 2019
@arnikola arnikola marked this pull request as ready for review March 18, 2019 17:30
ctx context.Context,
query index.Query,
opts index.QueryOptions,
) (index.QueryResults, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to update the return value

ctx context.Context,
query index.Query,
opts index.QueryOptions,
) (index.QueryResults, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too

@@ -65,6 +65,15 @@ type QueryOptions struct {
StartInclusive time.Time
EndExclusive time.Time
Limit int

// Optional param to filter aggregate values.
TermFilter *AggregateValuesMap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break into separate struct - i.e. AggregateQueryOptions v QueryOptions should be separate

@@ -157,20 +157,6 @@ genny-map-storage-index-results: install-m3x-repo
# Rename generated map file
mv -f $(m3db_package_path)/src/dbnode/storage/index/map_gen.go $(m3db_package_path)/src/dbnode/storage/index/results_map_gen.go

# Map generation rule for storage/index/AggregationValuesMap
.PHONY: genny-map-storage-index-aggregation-values
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one was github weirdness, right? Or still a valid comment?

# Map generation rule for dependent generated maps which are built on top of a
# generated map
.PHONY: genny-map-dependent-all
genny-map-dependent-all: \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think you need to break it up like this. i.e. you can just have

genny-map-all:
  ...
  genny-map-storage-index-aggregation-results

genny-map-storage-index-aggregation-results: genny-map-whichever-one-this-depends-on
   ...

errUnableToAddAggregateResultMissingID = errors.New("no id for result")
)

type aggregatedResults struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need tests for this struct verifying a few different things:

  • aggregation results
  • copying/non-copying behaviour
  • finalizing/non-finalizing behaviour

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is missing tests in general at the moment; looking to verify structure of the feature in general before writing them up

document doc.Document,
opts QueryOptions,
) error {
// TODO: is this neccessary to check for document correctness?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there's my answer :p

Was going to check with someone more versed in the code than I to see if this represents an error case

return nil
}

func (r *aggregatedResults) AggregateDocument(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have AggregateDocument and AddIDAndValues separate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to match the flow of the regular Query case; when it initially matches the block, it will be added through the AggregateDocument path, then when the results from multiple blocks are being merged, one is selected as the merging AggregateResult, and the rest are iterated through and call AddIDAndValues on that one to merge the results

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm i think you can do AggregateDocument or AddAggregatedValues (doesn't need ID in the second method)

}

// if this term hasn't been seen, ensure it should be included in output.
if !opts.TermFilter.Contains(termID) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm should this be the first thing you do in the function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I went back and forth on this; if it's already been added to the map, that indicates it's a valid value and there's no reason to check against the term filter

I guess this should be arranged by what's more likely; failing the filter lookup, or succeeding (the current way is biased towards succeeding)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly suggesting to do it early cause the flow made more sense, early terminating and all that. Don't think it makes a difference for perf really

// including returning it to a backing pool.
Finalize()

// NoFinalize marks the AggregateResults such that a subsequent call to
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely not; will remove

// Reset resets the AggregateResults object to initial state.
Reset(nsID ident.ID)

// Finalize releases any resources held by the AggregateResults object,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, is this needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely not; will remove

}

// AggregateValues is a collection of values for an aggregation query.
type AggregateValues interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just a wrapper on a code-gen'd type, if so - can we use the struct where needed instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I'll give it a shot, hopefully the code gen will still be happy with it

@@ -169,6 +271,13 @@ type Block interface {
results Results,
) (exhaustive bool, err error)

// AggregateQuery resolves the given query into aggregated tags.
AggregateQuery(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same nit about opts type

// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just return early here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seems to be the case

wg sync.WaitGroup

// Results contains all concurrent mutable state below.
results = struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you extract this into a separate struct and reuse it across both places

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, had something like that originally while trying to jam it into a combined function :)

mergedResult = true
for _, entry := range aggregateResults.Map().Iter() {
// Append to merged results.
id, tags := entry.Key(), entry.Value()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the ID needed in aggregation results?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a bad name; id here represents unique tag names

@@ -1436,3 +1440,230 @@ func (shards dbShards) IDs() []uint32 {
}
return ids
}

func (i *nsIndex) AggregateQuery(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: refactor this so that Query() and AggregateQuery() share the structural components of the method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was trying that but it got a little messy, will take another stab at it

valueBytes.IncRef()
valueBytes.AppendAll(value)
valueBytes.DecRef()
valueID := r.idPool.BinaryID(valueBytes)
Copy link
Collaborator

@robskillington robskillington Mar 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I realize you can actually use a helper function on the ID pool to make this code a little simpler:

valueID := r.idPool.Clone(ident.BytesID(value))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍


// NB: fine to overwrite the values here.
v.valuesMap.Set(bytesID, struct{}{})
return nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just condense this (since .Set(...) takes an ident.ID and will copy it) into:

	if len(value.Bytes()) == 0 {
		return errUnableToAddValueMissingID
	}

 	// NB: fine to overwrite the values here.
	v.valuesMap.Set(value, struct{}{})
	return nil

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah already have sorry, latest push was halfway consolidated :p

@@ -598,24 +598,49 @@ func (n *dbNamespace) QueryIDs(
ctx context.Context,
query index.Query,
opts index.QueryOptions,
) (index.QueryResults, error) {
) (index.QueryResult, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for rename

}

res, err := n.reverseIndex.AggregateQuery(ctx, query, opts, aggResultOpts)
n.metrics.queryIDs.ReportSuccessOrError(err, n.nowFn().Sub(callStart))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a metric for aggregateQuery (similar to queryIDs)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and reuse in all calls in this function)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one 👍

@@ -566,26 +566,41 @@ func TestNamespaceIndexBlockQuery(t *testing.T) {
StartInclusive: t0,
EndExclusive: now.Add(time.Minute),
}
b0.EXPECT().Query(gomock.Any(), q, qOpts, gomock.Any()).Return(true, nil)
aggOpts := index.AggregateResultsOptions{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you decouple the tests for the Query and Aggregate. One simple way - copy paste the test you have twice, and make it only do the mocks/calls for Query in one, and Aggregate in the other. That way each is independent of the other. Makes maintenance easier in the long run

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for sure. I didn't want to go ahead and c/p larger tests so figured would be fine to package them; will refactor to use a test setup method

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried with a setup method and it looked really bad; did the test twice instead

return v.valuesMap.Len()
}

func (v *AggregateValues) reset() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more a question: is it intentional these methods are un-exported?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, wanted to restrict these methods to this package so callers don't accidentally close the AggregateValues

) error {
for _, field := range document.Fields {
if err := r.addFieldWithLock(field.Name, field.Value); err != nil {
return err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be useful to bundle with document here too, something like:
return fmt.Errorf("unable to add document [%+v]: %v", document, err)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM


// if a term filter is provided, ensure this field matches the filter,
// otherwise ignore it.
if len(r.aggregateOpts.TermFilter) > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mind making this a function on the type? something like

type AggregateResultsOptions  struct {
  ...
  TermFilter [][]byte
}

type AggregateTermFilter [][]byte

func (a AggregateTermFilter) Filter(term []byte) bool {
  if len(a) == 0 {
    return false 
  }
  for ...
  return true
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, guess then we can use it elsewhere :)

return fmt.Errorf(missingDocumentFields, "value")
}

// NB: can cast the []byte -> ident.ID to avoid an alloc
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldn't this below the filtering code?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as in, termID isn't used until after the filtering. would be better to group along with the other vars there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah good one; the filter function previously required a ident.ID so had to define this earlier, but missed changing it after refactor

func (r *aggregatedResults) Finalize() {
r.Lock()

r.aggregateOpts = AggregateResultsOptions{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm could you reuse r.Reset(nil, AggregateResultsOptions{}) here? looks like they're both doing the same thing except the pool return

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, seems aggressive for now, we could potentially consolidate this later to unblock find endpoint working?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not sure I follow. I was suggesting changing the implementation of Finalize to the following:

func (r *aggregatedResults) Finalize() {
  r.Reset(nil, AggregateResultsOptions{})
   if r.pool == nil {
     return
  }
  r.pool.Put(r)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, so this method is a little different in that it needs to call finalize on the AggregatedValues map; I tried refactoring it so they'd both call a common function with a flag to determine if it should finalize or reset, but it ended up being a bit awkward so preferred this approach

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked offline: made sense to merge the behaviours

@@ -1202,8 +1202,11 @@ func withEncodingAndPoolingOptions(
postingsListOpts := poolOptions(policy.PostingsListPool, scope.SubScope("postingslist-pool"))
postingsList := postings.NewPool(postingsListOpts, roaring.NewPostingsList)

resultsPool := index.NewResultsPool(
poolOptions(policy.IndexResultsPool, scope.SubScope("index-results-pool")))
// Need to actually set pools
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Remove this now that the pools are set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call

@@ -0,0 +1,47 @@
// Copyright (c) 2018 Uber Technologies, Inc.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2019.

@@ -0,0 +1,47 @@
// Copyright (c) 2018 Uber Technologies, Inc.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2019.

contains(t, expected, res.Map())
}

func toFilter(strs ...string) AggregateTermFilter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

assert.True(t, aggVals.Map().Contains(ident.StringID("biz")))
}

func contains(t *testing.T, ex map[string][]string, ac *AggregateResultsMap) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: assertContains() would be clearer


found := false

// our genny generated maps don't provide access to MapEntry directly,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, v nice test.

// BaseResults is a collection of basic results for a generic query, it is
// synchronized when access to the results set is used as documented by the
// methods.
type BaseResults interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, nice

Copy link
Collaborator

@prateek prateek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Collaborator

@robskillington robskillington left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@arnikola arnikola merged commit 5e4d4f6 into master Mar 26, 2019
@arnikola arnikola deleted the arnikola/index_tag_search branch March 26, 2019 23:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants