diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index a6fd7159f1..095432a80a 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "sort" + "time" "unsafe" "github.com/go-kit/kit/log" @@ -39,6 +40,8 @@ const ( MagicIndex = 0xBAAAD792 symbolFactor = 32 + + postingLengthFieldSize = 4 ) // The table gets initialized with sync.Once but may still cause a race @@ -60,7 +63,7 @@ func newCRC32() hash.Hash32 { type BinaryTOC struct { // Symbols holds start to the same symbols section as index related to this index header. Symbols uint64 - // PostingsTable holds start to the the same Postings Offset Table section as index related to this index header. + // PostingsOffsetTable holds start to the the same Postings Offset Table section as index related to this index header. PostingsOffsetTable uint64 } @@ -383,6 +386,11 @@ func (w *binaryWriter) Close() error { return w.f.Close() } +type postingValueOffsets struct { + offsets []postingOffset + lastValOffset int64 +} + type postingOffset struct { // label value. value string @@ -399,7 +407,7 @@ type BinaryReader struct { // Map of LabelName to a list of some LabelValues's position in the offset table. // The first and last values for each name are always present. - postings map[string][]postingOffset + postings map[string]*postingValueOffsets // For the v1 format, labelname -> labelvalue -> offset. postingsV1 map[string]map[string]index.Range @@ -422,13 +430,14 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket return br, nil } - level.Warn(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err) + level.Debug(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err) + start := time.Now() if err := WriteBinary(ctx, bkt, id, binfn); err != nil { return nil, errors.Wrap(err, "write index header") } - level.Debug(logger).Log("msg", "build index-header file", "path", binfn, "err", err) + level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start)) return newFileBinaryReader(binfn) } @@ -447,7 +456,7 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) { r := &BinaryReader{ b: realByteSlice(f.Bytes()), c: f, - postings: map[string][]postingOffset{}, + postings: map[string]*postingValueOffsets{}, } // Verify header. @@ -487,9 +496,9 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) { if len(key) != 2 { return errors.Errorf("unexpected key length for posting table %d", len(key)) } - // TODO(bwplotka): This is wrong, probably we have to sort. + if lastKey != nil { - prevRng.End = int64(off + 4) + prevRng.End = int64(off - crc32.Size) r.postingsV1[lastKey[0]][lastKey[1]] = prevRng } @@ -499,13 +508,13 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) { } lastKey = key - prevRng = index.Range{Start: int64(off + 4)} + prevRng = index.Range{Start: int64(off + postingLengthFieldSize)} return nil }); err != nil { return nil, errors.Wrap(err, "read postings table") } if lastKey != nil { - prevRng.End = r.indexLastPostingEnd + 4 + prevRng.End = r.indexLastPostingEnd - crc32.Size r.postingsV1[lastKey[0]][lastKey[1]] = prevRng } } else { @@ -521,20 +530,24 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) { if _, ok := r.postings[key[0]]; !ok { // Next label name. - r.postings[key[0]] = []postingOffset{} + r.postings[key[0]] = &postingValueOffsets{} if lastKey != nil { - // Always include last value for each label name. - r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], tableOff: lastTableOff}) + if valueCount%symbolFactor != 0 { + // Always include last value for each label name. + r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff}) + } + r.postings[lastKey[0]].lastValOffset = int64(off - crc32.Size) + lastKey = nil } valueCount = 0 } + lastKey = key if valueCount%symbolFactor == 0 { - r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], tableOff: tableOff}) - lastKey = nil + r.postings[key[0]].offsets = append(r.postings[key[0]].offsets, postingOffset{value: key[1], tableOff: tableOff}) return nil } - lastKey = key + lastTableOff = tableOff valueCount++ return nil @@ -542,13 +555,16 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) { return nil, errors.Wrap(err, "read postings table") } if lastKey != nil { - r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], tableOff: lastTableOff}) + if valueCount%symbolFactor != 0 { + r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff}) + } + r.postings[lastKey[0]].lastValOffset = r.indexLastPostingEnd - crc32.Size } // Trim any extra space in the slices. for k, v := range r.postings { - l := make([]postingOffset, len(v)) - copy(l, v) - r.postings[k] = l + l := make([]postingOffset, len(v.offsets)) + copy(l, v.offsets) + r.postings[k].offsets = l } } @@ -637,7 +653,7 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran skip := 0 valueIndex := 0 - for valueIndex < len(values) && values[valueIndex] < e[0].value { + for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value { // Discard values before the start. valueIndex++ } @@ -646,19 +662,19 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran for valueIndex < len(values) { value := values[valueIndex] - i := sort.Search(len(e), func(i int) bool { return e[i].value >= value }) - if i == len(e) { + i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= value }) + if i == len(e.offsets) { // We're past the end. break } - if i > 0 && e[i].value != value { + if i > 0 && e.offsets[i].value != value { // Need to look from previous entry. i-- } // Don't Crc32 the entire postings offset table, this is very slow // so hope any issues were caught at startup. d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil) - d.Skip(e[i].tableOff) + d.Skip(e.offsets[i].tableOff) tmpRngs = tmpRngs[:0] // Iterate on the offset table. @@ -677,8 +693,7 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran postingOffset := int64(d.Uvarint64()) // Offset. for string(v) >= value { if string(v) == value { - // Actual posting is 4 bytes after offset, which includes length. - tmpRngs = append(tmpRngs, index.Range{Start: postingOffset + 4}) + tmpRngs = append(tmpRngs, index.Range{Start: postingOffset + postingLengthFieldSize}) } valueIndex++ if valueIndex == len(values) { @@ -686,22 +701,21 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran } value = values[valueIndex] } - if i+1 == len(e) { + if i+1 == len(e.offsets) { for i := range tmpRngs { - tmpRngs[i].End = r.indexLastPostingEnd + tmpRngs[i].End = e.lastValOffset } rngs = append(rngs, tmpRngs...) // Need to go to a later postings offset entry, if there is one. break } - if value >= e[i+1].value || valueIndex == len(values) { + if value >= e.offsets[i+1].value || valueIndex == len(values) { d.Skip(skip) d.UvarintBytes() // Label value. postingOffset := int64(d.Uvarint64()) // Offset. for j := range tmpRngs { - // Actual posting end is 4 bytes before next offset. - tmpRngs[j].End = postingOffset - 4 + tmpRngs[j].End = postingOffset - crc32.Size } rngs = append(rngs, tmpRngs...) // Need to go to a later postings offset entry, if there is one. @@ -748,14 +762,14 @@ func (r BinaryReader) LabelValues(name string) ([]string, error) { if !ok { return nil, nil } - if len(e) == 0 { + if len(e.offsets) == 0 { return nil, nil } - values := make([]string, 0, len(e)*symbolFactor) + values := make([]string, 0, len(e.offsets)*symbolFactor) d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil) - d.Skip(e[0].tableOff) - lastVal := e[len(e)-1].value + d.Skip(e.offsets[0].tableOff) + lastVal := e.offsets[len(e.offsets)-1].value skip := 0 for d.Err() == nil { diff --git a/pkg/block/indexheader/header.go b/pkg/block/indexheader/header.go index 39934d5da9..04b3f14049 100644 --- a/pkg/block/indexheader/header.go +++ b/pkg/block/indexheader/header.go @@ -18,7 +18,7 @@ type Reader interface { IndexVersion() int // PostingsOffset returns start and end offsets of postings for given name and value. - // end offset might be bigger than actual posting ending, but not larger then the whole index file. + // The end offset might be bigger than the actual posting ending, but not larger than the whole index file. // NotFoundRangeErr is returned when no index can be found for given name and value. // TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark. PostingsOffset(name string, value string) (index.Range, error) @@ -27,7 +27,9 @@ type Reader interface { // Error is return if the symbol can't be found. LookupSymbol(o uint32) (string, error) - // LabelValues returns all label values for given label name or error if not found. + // LabelValues returns all label values for given label name or error. + // If no values are found for label name, or label name does not exists, + // then empty string is returned and no error. LabelValues(name string) ([]string, error) // LabelNames returns all label names. diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 0f588d98f4..0fb8af221e 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -174,7 +174,7 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe minStart := int64(math.MaxInt64) maxEnd := int64(math.MinInt64) - for _, lname := range expLabelNames { + for il, lname := range expLabelNames { expectedLabelVals, err := indexReader.LabelValues(lname) testutil.Ok(t, err) @@ -182,7 +182,7 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe testutil.Ok(t, err) testutil.Equals(t, expectedLabelVals, vals) - for i, v := range vals { + for iv, v := range vals { if minStart > expRanges[labels.Label{Name: lname, Value: v}].Start { minStart = expRanges[labels.Label{Name: lname, Value: v}].Start } @@ -195,13 +195,21 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe // For index-cache those values are exact. // - // For binary they are exact except: - // * formatV2: last item posting offset. It's good enough if the value is larger than exact posting ending. - // * formatV1: all items. - if i == len(vals)-1 || indexReader.Version() == index.FormatV1 { - testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start) - testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End) - continue + // For binary they are exact except last item posting offset. It's good enough if the value is larger than exact posting ending. + if indexReader.Version() == index.FormatV2 { + if iv == len(vals)-1 && il == len(expLabelNames)-1 { + testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start) + testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End) + continue + } + } else { + // For index formatV1 the last one does not mean literally last value, as postings were not sorted. + // Account for that. We know it's 40 label value. + if v == "40" { + testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start) + testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End) + continue + } } testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}], ptr) } @@ -209,18 +217,13 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe ptr, err := headerReader.PostingsOffset(index.AllPostingsKey()) testutil.Ok(t, err) - // For AllPostingsKey ending has also too large ending which is well handled further on. testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start) - testutil.Assert(t, expRanges[labels.Label{Name: "", Value: ""}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: "", Value: ""}].End) + testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End) vals, err := indexReader.LabelValues("not-existing") testutil.Ok(t, err) testutil.Equals(t, []string(nil), vals) - vals, err = headerReader.LabelValues("not-existing") - testutil.Ok(t, err) - testutil.Equals(t, []string(nil), vals) - _, err = headerReader.PostingsOffset("not-existing", "1") testutil.NotOk(t, err) } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7d92bbef25..bd99f488c9 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1565,8 +1565,13 @@ func resizePostings(b []byte) ([]byte, error) { if d.Err() != nil { return nil, errors.Wrap(d.Err(), "read postings list") } - // 4 for posting length, then n * 4, foreach each big endian posting. - return b[:4+n*4], nil + + // 4 for postings number of entries, then 4, foreach each big endian posting. + size := 4 + n*4 + if len(b) <= size { + return nil, encoding.ErrInvalidSize + } + return b[:size], nil } // bigEndianPostings implements the Postings interface over a byte stream of