Skip to content

Commit

Permalink
Handle V1 indexes, some of which have unsorted posting offset tables. (
Browse files Browse the repository at this point in the history
…prometheus#6564)

Fixes prometheus#6535

Signed-off-by: Brian Brazil <[email protected]>
  • Loading branch information
brian-brazil authored Jan 6, 2020
1 parent 577e738 commit 000e306
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 51 deletions.
26 changes: 15 additions & 11 deletions tsdb/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,20 @@ func TestBlockSize(t *testing.T) {
}

func TestReadIndexFormatV1(t *testing.T) {
/* The block here was produced at commit
07ef80820ef1250db82f9544f3fcf7f0f63ccee0 with:
db, _ := Open("v1db", nil, nil, nil)
app := db.Appender()
app.Add(labels.FromStrings("foo", "bar"), 1, 2)
app.Add(labels.FromStrings("foo", "baz"), 3, 4)
app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, 4) // Not in the block.
app.Commit()
db.compact()
db.Close()
/* The block here was produced at the commit
706602daed1487f7849990678b4ece4599745905 used in 2.0.0 with:
db, _ := Open("v1db", nil, nil, nil)
app := db.Appender()
app.Add(labels.FromStrings("foo", "bar"), 1, 2)
app.Add(labels.FromStrings("foo", "baz"), 3, 4)
app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, 4) // Not in the block.
// Make sure we've enough values for the lack of sorting of postings offsets to show up.
for i := 0; i < 100; i++ {
app.Add(labels.FromStrings("bar", strconv.FormatInt(int64(i), 10)), 0, 0)
}
app.Commit()
db.compact()
db.Close()
*/

blockDir := filepath.Join("testdata", "index_format_v1")
Expand All @@ -290,7 +294,7 @@ func TestReadIndexFormatV1(t *testing.T) {

q, err = NewBlockQuerier(block, 0, 1000)
testutil.Ok(t, err)
testutil.Equals(t, query(t, q, labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^.$")),
testutil.Equals(t, query(t, q, labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^.?$")),
map[string][]tsdbutil.Sample{
`{foo="bar"}`: []tsdbutil.Sample{sample{t: 1, v: 2}},
`{foo="baz"}`: []tsdbutil.Sample{sample{t: 3, v: 4}},
Expand Down
126 changes: 91 additions & 35 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,8 @@ type Reader struct {
// Map of LabelName to a list of some LabelValues's position in the offset table.
// The first and last values for each name are always present.
postings map[string][]postingOffset
// For the v1 format, labelname -> labelvalue -> offset.
postingsV1 map[string]map[string]uint64

symbols *Symbols
nameSymbols map[uint32]string // Cache of the label name symbol lookups,
Expand Down Expand Up @@ -1113,45 +1115,64 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
return nil, errors.Wrap(err, "read symbols")
}

var lastKey []string
lastOff := 0
valueCount := 0
// For the postings offset table we keep every label name but only every nth
// label value (plus the first and last one), to save memory.
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error {
if len(key) != 2 {
return errors.Errorf("unexpected key length for posting table %d", len(key))
if r.version == FormatV1 {
// Earlier V1 formats don't have a sorted postings offset table, so
// load the whole offset table into memory.
r.postingsV1 = map[string]map[string]uint64{}
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error {
if len(key) != 2 {
return errors.Errorf("unexpected key length for posting table %d", len(key))
}
if _, ok := r.postingsV1[key[0]]; !ok {
r.postingsV1[key[0]] = map[string]uint64{}
r.postings[key[0]] = nil // Used to get a list of labelnames in places.
}
r.postingsV1[key[0]][key[1]] = off
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
if _, ok := r.postings[key[0]]; !ok {
// Next label name.
r.postings[key[0]] = []postingOffset{}
if lastKey != nil {
// Always include last value for each label name.
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
} else {
var lastKey []string
lastOff := 0
valueCount := 0
// For the postings offset table we keep every label name but only every nth
// label value (plus the first and last one), to save memory.
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error {
if len(key) != 2 {
return errors.Errorf("unexpected key length for posting table %d", len(key))
}
if _, ok := r.postings[key[0]]; !ok {
// Next label name.
r.postings[key[0]] = []postingOffset{}
if lastKey != nil {
// Always include last value for each label name.
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
}
lastKey = nil
valueCount = 0
}
lastKey = nil
valueCount = 0
if valueCount%32 == 0 {
r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], off: off})
lastKey = nil
} else {
lastKey = key
lastOff = off
}
valueCount++
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
if valueCount%32 == 0 {
r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], off: off})
lastKey = nil
} else {
lastKey = key
lastOff = off
if lastKey != nil {
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
}
// Trim any extra space in the slices.
for k, v := range r.postings {
l := make([]postingOffset, len(v))
copy(l, v)
r.postings[k] = l
}
valueCount++
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
if lastKey != nil {
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
}
// Trim any extra space in the slices.
for k, v := range r.postings {
l := make([]postingOffset, len(v))
copy(l, v)
r.postings[k] = l
}

r.nameSymbols = make(map[uint32]string, len(r.postings))
Expand Down Expand Up @@ -1408,6 +1429,19 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
if len(names) != 1 {
return nil, errors.Errorf("only one label name supported")
}
if r.version == FormatV1 {
e, ok := r.postingsV1[names[0]]
if !ok {
return emptyStringTuples{}, nil
}
values := make([]string, 0, len(e))
for k := range e {
values = append(values, k)
}
sort.Strings(values)
return NewStringTuples(values, 1)

}
e, ok := r.postings[names[0]]
if !ok {
return emptyStringTuples{}, nil
Expand Down Expand Up @@ -1467,6 +1501,28 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
}

func (r *Reader) Postings(name string, values ...string) (Postings, error) {
if r.version == FormatV1 {
e, ok := r.postingsV1[name]
if !ok {
return EmptyPostings(), nil
}
res := make([]Postings, 0, len(values))
for _, v := range values {
postingsOff, ok := e[v]
if !ok {
continue
}
// Read from the postings table.
d := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.Postings(d.Get())
if err != nil {
return nil, errors.Wrap(err, "decode postings")
}
res = append(res, p)
}
return Merge(res...), nil
}

e, ok := r.postings[name]
if !ok {
return EmptyPostings(), nil
Expand Down
Binary file modified tsdb/testdata/index_format_v1/chunks/000001
Binary file not shown.
Binary file modified tsdb/testdata/index_format_v1/index
Binary file not shown.
10 changes: 5 additions & 5 deletions tsdb/testdata/index_format_v1/meta.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
{
"version": 1,
"ulid": "01DVZX4CHY2EGZ6JQVS80AB9CF",
"ulid": "01DXXFZDYD1MQW6079WK0K6EDQ",
"minTime": 0,
"maxTime": 7200000,
"stats": {
"numSamples": 2,
"numSeries": 2,
"numChunks": 2
"numSamples": 102,
"numSeries": 102,
"numChunks": 102
},
"compaction": {
"level": 1,
"sources": [
"01DVZX4CHY2EGZ6JQVS80AB9CF"
"01DXXFZDYD1MQW6079WK0K6EDQ"
]
}
}

0 comments on commit 000e306

Please sign in to comment.