Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Moved to our own custom posting helpers. #753

Merged
merged 1 commit into from
Jan 25, 2019
Merged

Conversation

bwplotka
Copy link
Member

@bwplotka bwplotka commented Jan 22, 2019

This is necessary to support newest TSDB, so blocker for: #704

Newest optimization (http://github.com/prometheus/tsdb/pull/486) makes tsdb.PostingForMatcher impossible to use.

This also hopefully reduce amount of code to understand as we don't need to necessarily fit into
index.Postings even though we don't need streaming for now. This should make the code less complex and more readable (in comparision to previous lazyPostings implementation)

Benchmarks to come.

Signed-off-by: Bartek Plotka [email protected]

@bwplotka
Copy link
Member Author

bwplotka commented Jan 22, 2019

Benchmarks:

Current master branch:

goos: linux
goarch: amd64
pkg: github.com/improbable-eng/thanos/pkg/store
BenchmarkBucketStore_Series-12    	  100000	    771896 ns/op	   59657 B/op	     729 allocs/op
PASS
ok  	github.com/improbable-eng/thanos/pkg/store	86.232s

This PR:

goos: linux
goarch: amd64
pkg: github.com/improbable-eng/thanos/pkg/store
BenchmarkBucketStore_Series-12    	  100000	    766911 ns/op	   60421 B/op	     735 allocs/op
PASS
ok  	github.com/improbable-eng/thanos/pkg/store	86.234s

Test: https://gist.github.com/bwplotka/b297ada1b8cc9c14087f8307233bff5a

So it seems like there is no much difference in terms of performance.. new version is slightly worse. Parity check ✔️

Can spend some time to look if we can have quick gains.

@@ -1188,89 +1150,200 @@ func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketB
return r
}

func (r *bucketIndexReader) preloadPostings() error {
const maxGapSize = 512 * 1024
func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will be used in the Upgrade TSDB PR.

@bwplotka
Copy link
Member Author

bwplotka commented Jan 23, 2019

In terms of CPU, most of it is heap and mallocgc..
image

In terms of memory, nothing extra ordinary either. Mostly chunk pool that alloc bytes which is reasonable as we benchmark, since we use 12 CPU cores... We might want to have concurrent query limit, to make sure pool can reuse memory better.

Copy link
Contributor

@domgreen domgreen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All looks good, took a while to review/understand in depth what was happening.

Only thing I can see that I don't think is correct is the incrementing of the stats for postingsFetched.

// on object storage.
// Found posting IDs (ps) are not strictly required to point to a valid Series, e.g. during
// background garbage collections.
func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add a high-level overview of what a posting is just so it is more understandable for readers:

A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first chunk where the series contains the matching label-value pair for a given block of data. 

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Extended bit as well.

return nil, stats, errors.Wrap(err, "expand postings")
}
stats := &queryStats{}
stats = stats.merge(indexr.stats)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we do this after the if len(ps)==0 and just return indexr.stats if we have storepb.EmptySeriesSet?

for _, m := range ms {
matching, err := matchingLabels(r.LabelValues, m)
if err != nil {
return nil, errors.Wrap(err, "match postings")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match postings > matching labels?

_, l, err := r.dec.Postings(c)
if err != nil {
return errors.Wrap(err, "read postings list")
// fetchPostings returns sorted slice of postings.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchPostings returns sorted slice of postings that match the selected labels.

defer r.mtx.Unlock()

r.stats.postingsFetchCount++
r.stats.postingsFetched += len(ptrs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(ptrs) > j - i?
It seems to me that we are increasing by the number of pointers in each go routine, therefore, this would be a much bigger value than it, in fact, should be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or could just increment it in the loop below r.stats.postingFetched++ for each pointer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot increment in loop below as they are not fetched yet there. Otherwise you comment makes sense it's a bug, should be j-i, thanks!

for _, p := range ptrs[i:j] {
c := b[p.ptr.Start-start : p.ptr.End-start]

_, l, err := r.dec.Postings(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

l > fetchedPostings?

return nil, err
}

return index.Merge(postings...), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worth checking here if the returned can be cast to ErrPostings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. This will bubble up eventually after expand operation. ErrPostings is just a postings with Next = false and some Err. I think it's fine for now, we can check for errors early if that will cause problems (it will be hard to find cause of error) .

// at the loading level.
if r.block.indexVersion >= 2 {
for i, id := range ps {
ps[i] = id * 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand this how does multiplying by 16 ensure its padded correctly? Maybe speak offline.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's 1:1 copied with what it was.

Basically TSDB are now storing those reference every 16 bytes. The index format changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhhh, okay now I understand we are calculating the offset which as we are saving everything as a fixed byte range we need to ensure we calculate the offset of id to point to the correct bytes 👍

Copy link
Contributor

@domgreen domgreen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

  • just one nit on variable name spelling.


start := int64(p.start)
// We assume index does not have any ptrs that has 0 length.
lenght := int64(p.end) - start
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lenght > length

// at the loading level.
if r.block.indexVersion >= 2 {
for i, id := range ps {
ps[i] = id * 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhhh, okay now I understand we are calculating the offset which as we are saving everything as a fixed byte range we need to ensure we calculate the offset of id to point to the correct bytes 👍

// gets created each. This way we can easily verify we got 10 chunks per series below.
id1, err := testutil.CreateBlock(dir, series[:4], 10, mint, maxt, extLset, 0)
testutil.Ok(t, err)
id2, err := testutil.CreateBlock(dir, series[4:], 10, mint, maxt, extLset, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't those blocks overlapping? And if yes, isn't that an issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have different external labels 😉

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but thanks for being careful here!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

off-course 🤦‍♂️ sorry and thanks for pointing out

rctx, rcancel := context.WithTimeout(ctx, 30*time.Second)
defer rcancel()
testutil.Ok(t, runutil.Retry(100*time.Millisecond, rctx.Done(), func() error {
if store.numBlocks() < 6 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.. might be somehow noted above that we create 2 blocks for each of 3 time slots?
Or maybe even make this a variable and increment it each time you upload a block so it's not hardcoded?

This is necessary to support newest TSDB.

Newest optimization (http://github.com/prometheus/tsdb/pull/486) makes tsdb.PostingForMatcher impossible to use.

This also hopefully reduce amount of code to understand as we don't need to necessarily fit into
index.Postings even though we don't need streaming for now. This should make the code less complex and more readable
(in comparision to previous `lazyPostings` implementation.

Signed-off-by: Bartek Plotka <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants