Skip to content

Commit

Permalink
store: Fixed binary header bug that was causing all postings to be ke…
Browse files Browse the repository at this point in the history
…pt in memory instead of 1/32 as we meant. (#2390)

* store: Fixed binary header bug that was causing all postings to be kept in memory instead of 1/32 as we meant.

Spotted by @mkabischev! Thanks to you and @d-ulyanov as well! Epic finding +1


Test output before fix:
					testutil.Equals(t, 1, br.version)
					testutil.Equals(t, 2, br.indexVersion)
					testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 66}, br.toc)
					testutil.Equals(t, int64(626), br.indexLastPostingEnd)
					testutil.Equals(t, 8, br.symbols.Size())
					testutil.Equals(t, map[string]*postingValueOffsets{
						"": {
							offsets:       []postingOffset{{value: "", tableOff: 4}},
							lastValOffset: 392,
						},
						"a": {
							offsets: []postingOffset{
								{value: "1", tableOff: 9},
								{value: "11", tableOff: 16},
								{value: "12", tableOff: 24},
								{value: "2", tableOff: 32},
								{value: "3", tableOff: 39},
								{value: "4", tableOff: 46},
								{value: "5", tableOff: 53},
								{value: "6", tableOff: 60},
								{value: "7", tableOff: 67},
								{value: "8", tableOff: 74},
								{value: "9", tableOff: 81},
							},
							lastValOffset: 572,
						},
						"longer-string": {
							offsets:       []postingOffset{{value: "1", tableOff: 88}},
							lastValOffset: 622,
						},
					}, br.postings)
					testutil.Equals(t, 0, len(br.postingsV1))
					testutil.Equals(t, 2, len(br.nameSymbols))

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Added CHANGELOG item.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Fixed build errs.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Addressed Lucas comment.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Apr 8, 2020
1 parent 7b9d72d commit c0b9179
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 44 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2208](https://github.com/thanos-io/thanos/pull/2208) Query and Rule: fix handling of `web.route-prefix` to correctly handle `/` and prefixes that do not begin with a `/`.
- [#2311](https://github.com/thanos-io/thanos/pull/2311) Receive: ensure receive component serves TLS when TLS configuration is provided.
- [#2319](https://github.com/thanos-io/thanos/pull/2319) Query: fixed inconsistent naming of metrics.
- [#2390](https://github.com/thanos-io/thanos/pull/2390) Store: Fixed bug which was causing all posting offsets to be used instead of 1/32 as it was meant.
Added hidden flag to control this behavior.

### Added

Expand Down
9 changes: 9 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main

import (
"context"
"fmt"
"path"
"time"

Expand Down Expand Up @@ -86,6 +87,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
disableIndexHeader := cmd.Flag("store.disable-index-header", "If specified, Store Gateway will use index-cache.json for each block instead of recreating binary index-header").
Hidden().Default("false").Bool()

postingOffsetsInMemSampling := cmd.Flag("store.index-header-posting-offsets-in-mem-sampling", "Controls what is the ratio of postings offsets store will hold in memory. "+
"Larger value will keep less offsets, which will increase CPU cycles needed for query touching those postings. It's meant for setups that want low baseline memory pressure and where less traffic is expected. "+
"On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance. This works only when --store.disable-index-header is NOT specified.").
Hidden().Default(fmt.Sprintf("%v", store.DefaultPostingOffsetInMemorySampling)).Int()

enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size.").
Hidden().Default("false").Bool()

Expand Down Expand Up @@ -142,6 +148,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
time.Duration(*ignoreDeletionMarksDelay),
*webExternalPrefix,
*webPrefixHeaderName,
*postingOffsetsInMemSampling,
)
}
}
Expand Down Expand Up @@ -171,6 +178,7 @@ func runStore(
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
externalPrefix, prefixHeader string,
postingOffsetsInMemSampling int,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -275,6 +283,7 @@ func runStore(
advertiseCompatibilityLabel,
!disableIndexHeader,
enablePostingsCompression,
postingOffsetsInMemSampling,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
46 changes: 26 additions & 20 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ const (
// MagicIndex are 4 bytes at the head of an index-header file.
MagicIndex = 0xBAAAD792

symbolFactor = 32

postingLengthFieldSize = 4
)

Expand Down Expand Up @@ -428,11 +426,12 @@ type BinaryReader struct {
c io.Closer

// Map of LabelName to a list of some LabelValues's position in the offset table.
// The first and last values for each name are always present.
// The first and last values for each name are always present, we keep only 1/postingOffsetsInMemSampling of the rest.
postings map[string]*postingValueOffsets
// For the v1 format, labelname -> labelvalue -> offset.
postingsV1 map[string]map[string]index.Range

// Symbols struct that keeps only 1/postingOffsetsInMemSampling in the memory, then looks up the rest via mmap.
symbols *index.Symbols
nameSymbols map[uint32]string // Cache of the label name symbol lookups,
// as there are not many and they are half of all lookups.
Expand All @@ -442,12 +441,14 @@ type BinaryReader struct {
version int
indexVersion int
indexLastPostingEnd int64

postingOffsetsInMemSampling int
}

// NewBinaryReader loads or builds new index-header if not present on disk.
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*BinaryReader, error) {
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*BinaryReader, error) {
binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename)
br, err := newFileBinaryReader(binfn)
br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling)
if err == nil {
return br, nil
}
Expand All @@ -460,11 +461,10 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket
}

level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start))

return newFileBinaryReader(binfn)
return newFileBinaryReader(binfn, postingOffsetsInMemSampling)
}

func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *BinaryReader, err error) {
f, err := fileutil.OpenMmapFile(path)
if err != nil {
return nil, err
Expand All @@ -476,9 +476,10 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
}()

r := &BinaryReader{
b: realByteSlice(f.Bytes()),
c: f,
postings: map[string]*postingValueOffsets{},
b: realByteSlice(f.Bytes()),
c: f,
postings: map[string]*postingValueOffsets{},
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
}

// Verify header.
Expand All @@ -502,6 +503,7 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
return nil, errors.Wrap(err, "read index header TOC")
}

// TODO(bwplotka): Consider contributing to Prometheus to allow specifying custom number for symbolsFactor.
r.symbols, err = index.NewSymbols(r.b, r.indexVersion, int(r.toc.Symbols))
if err != nil {
return nil, errors.Wrap(err, "read symbols")
Expand Down Expand Up @@ -551,11 +553,12 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
}

if _, ok := r.postings[key[0]]; !ok {
// Next label name.
// Not seen before label name.
r.postings[key[0]] = &postingValueOffsets{}
if lastKey != nil {
if valueCount%symbolFactor != 0 {
// Always include last value for each label name.
// Always include last value for each label name, unless it was just added in previous iteration based
// on valueCount.
if (valueCount-1)%postingOffsetsInMemSampling != 0 {
r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff})
}
r.postings[lastKey[0]].lastValOffset = int64(off - crc32.Size)
Expand All @@ -565,21 +568,24 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
}

lastKey = key
if valueCount%symbolFactor == 0 {
lastTableOff = tableOff
valueCount++

if (valueCount-1)%postingOffsetsInMemSampling == 0 {
r.postings[key[0]].offsets = append(r.postings[key[0]].offsets, postingOffset{value: key[1], tableOff: tableOff})
return nil
}

lastTableOff = tableOff
valueCount++
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
if lastKey != nil {
if valueCount%symbolFactor != 0 {
if (valueCount-1)%postingOffsetsInMemSampling != 0 {
// Always include last value for each label name if not included already based on valueCount.
r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff})
}
// In any case lastValOffset is unknown as don't have next posting anymore. Guess from TOC table.
// In worst case we will overfetch a few bytes.
r.postings[lastKey[0]].lastValOffset = r.indexLastPostingEnd - crc32.Size
}
// Trim any extra space in the slices.
Expand Down Expand Up @@ -787,7 +793,7 @@ func (r BinaryReader) LabelValues(name string) ([]string, error) {
if len(e.offsets) == 0 {
return nil, nil
}
values := make([]string, 0, len(e.offsets)*symbolFactor)
values := make([]string, 0, len(e.offsets)*r.postingOffsetsInMemSampling)

d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil)
d.Skip(e.offsets[0].tableOff)
Expand Down
46 changes: 38 additions & 8 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ func TestReaders(t *testing.T) {
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "a", Value: "5"}},
{{Name: "a", Value: "6"}},
{{Name: "a", Value: "7"}},
{{Name: "a", Value: "8"}},
{{Name: "a", Value: "9"}},
// Missing 10 on purpose.
{{Name: "a", Value: "11"}},
{{Name: "a", Value: "12"}},
{{Name: "a", Value: "13"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124)
testutil.Ok(t, err)
Expand Down Expand Up @@ -92,18 +101,37 @@ func TestReaders(t *testing.T) {
fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename)
testutil.Ok(t, WriteBinary(ctx, bkt, id, fn))

br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id)
br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3)
testutil.Ok(t, err)

defer func() { testutil.Ok(t, br.Close()) }()

if id == id1 {
testutil.Equals(t, 1, br.version)
testutil.Equals(t, 2, br.indexVersion)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 50}, br.toc)
testutil.Equals(t, int64(330), br.indexLastPostingEnd)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 69}, br.toc)
testutil.Equals(t, int64(666), br.indexLastPostingEnd)
testutil.Equals(t, 8, br.symbols.Size())
testutil.Equals(t, 3, len(br.postings))
testutil.Equals(t, map[string]*postingValueOffsets{
"": {
offsets: []postingOffset{{value: "", tableOff: 4}},
lastValOffset: 416,
},
"a": {
offsets: []postingOffset{
{value: "1", tableOff: 9},
{value: "13", tableOff: 32},
{value: "4", tableOff: 54},
{value: "7", tableOff: 75},
{value: "9", tableOff: 89},
},
lastValOffset: 612,
},
"longer-string": {
offsets: []postingOffset{{value: "1", tableOff: 96}},
lastValOffset: 662,
},
}, br.postings)
testutil.Equals(t, 0, len(br.postingsV1))
testutil.Equals(t, 2, len(br.nameSymbols))
}
Expand All @@ -121,9 +149,9 @@ func TestReaders(t *testing.T) {
defer func() { testutil.Ok(t, jr.Close()) }()

if id == id1 {
testutil.Equals(t, 6, len(jr.symbols))
testutil.Equals(t, 14, len(jr.symbols))
testutil.Equals(t, 2, len(jr.lvals))
testutil.Equals(t, 6, len(jr.postings))
testutil.Equals(t, 14, len(jr.postings))
}

compareIndexToHeader(t, b, jr)
Expand Down Expand Up @@ -224,12 +252,14 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start)
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End)

// Check not existing.
vals, err := indexReader.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

_, err = headerReader.PostingsOffset("not-existing", "1")
testutil.NotOk(t, err)
_, err = headerReader.PostingsOffset("a", "10")
testutil.NotOk(t, err)
}

func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *metadata.Meta {
Expand Down Expand Up @@ -392,7 +422,7 @@ func BenchmarkBinaryReader(t *testing.B) {

t.ResetTimer()
for i := 0; i < t.N; i++ {
br, err := newFileBinaryReader(fn)
br, err := newFileBinaryReader(fn, 32)
testutil.Ok(t, err)
testutil.Ok(t, br.Close())
}
Expand Down
28 changes: 17 additions & 11 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ const (
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
maxSamplesPerChunk = 120

maxChunkSize = 16000

maxSeriesSize = 64 * 1024
maxChunkSize = 16000
maxSeriesSize = 64 * 1024

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
Expand All @@ -76,6 +74,11 @@ const (
// TODO(bwplotka): Remove it at some point.
CompatibilityTypeLabelName = "@thanos_compatibility_store_type"

// DefaultPostingOffsetInMemorySampling represents default value for --store.index-header-posting-offsets-in-mem-sampling.
// 32 value is chosen as it's a good balance for common setups. Sampling that is not too large (too many CPU cycles) and
// not too small (too much memory).
DefaultPostingOffsetInMemorySampling = 32

partitionerMaxGapSize = 512 * 1024
)

Expand Down Expand Up @@ -252,7 +255,8 @@ type BucketStore struct {
// Reencode postings using diff+varint+snappy when storing to cache.
// This makes them smaller, but takes extra CPU and memory.
// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
enablePostingsCompression bool
enablePostingsCompression bool
postingOffsetsInMemSampling int
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -273,6 +277,7 @@ func NewBucketStore(
enableCompatibilityLabel bool,
enableIndexHeader bool,
enablePostingsCompression bool,
postingOffsetsInMemSampling int,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -304,11 +309,12 @@ func NewBucketStore(
maxConcurrent,
extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg),
),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
enablePostingsCompression: enablePostingsCompression,
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
enablePostingsCompression: enablePostingsCompression,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
}
s.metrics = metrics

Expand Down Expand Up @@ -463,7 +469,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er

var indexHeaderReader indexheader.Reader
if s.enableIndexHeader {
indexHeaderReader, err = indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID)
indexHeaderReader, err = indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID, s.postingOffsetsInMemSampling)
if err != nil {
return errors.Wrap(err, "create index header reader")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
true,
true,
true,
DefaultPostingOffsetInMemorySampling,
)
testutil.Ok(t, err)
s.store = store
Expand Down
Loading

0 comments on commit c0b9179

Please sign in to comment.