From 18d9fed5affde65431e3318c6b993995da2df3ba Mon Sep 17 00:00:00 2001 From: Vladimir Kononov Date: Tue, 23 Mar 2021 18:42:41 +0300 Subject: [PATCH] Apply code review suggestions Signed-off-by: Vladimir Kononov --- pkg/store/bucket.go | 97 ++++++++++++++-------------- pkg/store/bucket_test.go | 8 ++- pkg/store/storepb/testutil/series.go | 16 +++-- 3 files changed, 63 insertions(+), 58 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 318e0b1a8d5..e068ee1a1b7 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -764,9 +764,10 @@ func blockSeries( s.refs = make([]uint64, 0, len(chks)) s.chks = make([]storepb.AggrChunk, 0, len(chks)) for j, meta := range chks { - // s is appended to res, but not at every iteration, hence len(res) is the index we need. - if err := chunkr.addPreload(meta.Ref, len(res), j); err != nil { - return nil, nil, errors.Wrap(err, "add chunk preload") + // seriesEntry s is appended to res, but not at every outer loop iteration, + // therefore len(res) is the index we need here, not outer loop iteration number. + if err := chunkr.addLoad(meta.Ref, len(res), j); err != nil { + return nil, nil, errors.Wrap(err, "add chunk load") } s.chks = append(s.chks, storepb.AggrChunk{ MinTime: meta.MinTime, @@ -792,16 +793,16 @@ func blockSeries( return newBucketSeriesSet(res), indexr.stats, nil } - if err := chunkr.preload(res, req.Aggregates); err != nil { - return nil, nil, errors.Wrap(err, "preload chunks") + if err := chunkr.load(res, req.Aggregates); err != nil { + return nil, nil, errors.Wrap(err, "load chunks") } return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, savior func([]byte) ([]byte, error)) error { +func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error)) error { if in.Encoding() == chunkenc.EncXOR { - b, err := savior(in.Bytes()) + b, err := save(in.Bytes()) if err != nil { return err } @@ -821,7 +822,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrCount) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -831,7 +832,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrSum) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -841,7 +842,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrMin) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -851,7 +852,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrMax) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -861,7 +862,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return errors.Errorf("aggregate %s does not exist", downsample.AggrCounter) } - b, err := savior(x.Bytes()) + b, err := save(x.Bytes()) if err != nil { return err } @@ -2242,9 +2243,10 @@ func decodeSeriesForTime(b []byte, lset *[]symbolizedLabel, chks *[]chunks.Meta, } type preloadIdx struct { - off uint32 - i int - j int + offset uint32 + // Indices, not actual entries and chunks. + seriesEntry int + chunk int } type bucketChunkReader struct { @@ -2267,20 +2269,7 @@ func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkR block: block, stats: &queryStats{}, preloads: make([][]preloadIdx, len(block.chunkObjs)), - chunks: map[uint64]chunkenc.Chunk{}, - } -} - -func (r *bucketChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) { - c, ok := r.chunks[id] - if !ok { - return nil, errors.Errorf("chunk with ID %d not found", id) } - - r.stats.chunksTouched++ - r.stats.chunksTouchedSizeSum += len(c.Bytes()) - - return c, nil } func (r *bucketChunkReader) Close() error { @@ -2292,8 +2281,9 @@ func (r *bucketChunkReader) Close() error { return nil } -// appPreload adds the chunk with id to the data set that will be fetched on calling preload. -func (r *bucketChunkReader) addPreload(id uint64, i, j int) error { +// addLoad adds the chunk with id to the data set to be fetched. +// Chunk will be fetched and saved to res[seriesEntry][chunk] upon r.load(res, <...>) call. +func (r *bucketChunkReader) addLoad(id uint64, seriesEntry, chunk int) error { var ( seq = int(id >> 32) off = uint32(id) @@ -2301,20 +2291,20 @@ func (r *bucketChunkReader) addPreload(id uint64, i, j int) error { if seq >= len(r.preloads) { return errors.Errorf("reference sequence %d out of range", seq) } - r.preloads[seq] = append(r.preloads[seq], preloadIdx{off, i, j}) + r.preloads[seq] = append(r.preloads[seq], preloadIdx{off, seriesEntry, chunk}) return nil } -// preload all added chunk IDs. Must be called before the first call to Chunk is made. -func (r *bucketChunkReader) preload(res []seriesEntry, aggrs []storepb.Aggr) error { +// load loads all added chunks and saves resulting aggrs to res. +func (r *bucketChunkReader) load(res []seriesEntry, aggrs []storepb.Aggr) error { g, ctx := errgroup.WithContext(r.ctx) for seq, pIdxs := range r.preloads { sort.Slice(pIdxs, func(i, j int) bool { - return pIdxs[i].off < pIdxs[j].off + return pIdxs[i].offset < pIdxs[j].offset }) parts := r.block.partitioner.Partition(len(pIdxs), func(i int) (start, end uint64) { - return uint64(pIdxs[i].off), uint64(pIdxs[i].off) + EstimatedMaxChunkSize + return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize }) for _, p := range parts { @@ -2358,7 +2348,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a var ( buf = make([]byte, EstimatedMaxChunkSize) - readOffset = int(pIdxs[0].off) + readOffset = int(pIdxs[0].offset) // Save a few allocations. written int64 @@ -2369,17 +2359,19 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a for i, pIdx := range pIdxs { // Fast forward range reader to the next chunk start in case of sparse (for our purposes) byte range. - for readOffset < int(pIdx.off) { - written, err = io.CopyN(ioutil.Discard, bufReader, int64(pIdx.off)-int64(readOffset)) + for readOffset < int(pIdx.offset) { + written, err = io.CopyN(ioutil.Discard, bufReader, int64(pIdx.offset)-int64(readOffset)) if err != nil { return errors.Wrap(err, "fast forward range reader") } readOffset += int(written) } - // Presume chunk length. + // Presume chunk length to be reasonably large for common use cases. + // However, declaration for EstimatedMaxChunkSize warns us some chunks could be larger in some rare cases. + // This is handled further down below. chunkLen = EstimatedMaxChunkSize if i+1 < len(pIdxs) { - if diff = pIdxs[i+1].off - pIdx.off; int(diff) < chunkLen { + if diff = pIdxs[i+1].offset - pIdx.offset; int(diff) < chunkLen { chunkLen = int(diff) } } @@ -2389,21 +2381,23 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a if errors.Is(err, io.ErrUnexpectedEOF) && i == len(pIdxs)-1 { // This may be a valid case. } else if err != nil { - return errors.Wrapf(err, "read range for seq %d offset %x", seq, pIdx.off) + return errors.Wrapf(err, "read range for seq %d offset %x", seq, pIdx.offset) } - l, n := binary.Uvarint(cb) + chunkDataLen, n := binary.Uvarint(cb) if n < 1 { return errors.New("reading chunk length failed") } - // Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and l for actual chunk data. + // Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and chunkDataLen for actual chunk data. // There is also crc32 after the chunk, but we ignore that. - chunkLen = n + 1 + int(l) + chunkLen = n + 1 + int(chunkDataLen) if chunkLen <= len(cb) { - if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk(cb[n:chunkLen]), aggrs, r.savior); err != nil { + if err := populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save); err != nil { return errors.Wrap(err, "populate chunk") } + r.stats.chunksTouched++ + r.stats.chunksTouchedSizeSum += int(chunkDataLen) continue } @@ -2414,7 +2408,8 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a fetchBegin = time.Now() // Read entire chunk into new buffer. - nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.off), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) + // TODO: readChunkRange call could be avoided for any chunk but last in this particular part. + nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) if err != nil { return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen) } @@ -2428,17 +2423,21 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a r.stats.chunksFetchCount++ r.stats.chunksFetchDurationSum += time.Since(fetchBegin) r.stats.chunksFetchedSizeSum += len(*nb) - - if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk((*nb)[n:]), aggrs, r.savior); err != nil { + if err := populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save); err != nil { + r.block.chunkPool.Put(nb) return errors.Wrap(err, "populate chunk") } + r.stats.chunksTouched++ + r.stats.chunksTouchedSizeSum += int(chunkDataLen) r.block.chunkPool.Put(nb) } return nil } -func (r *bucketChunkReader) savior(b []byte) ([]byte, error) { +// save saves a copy of b's payload to a memory pool of its own and returns a new byte slice referencing said copy. +// Returned slice becomes invalid once r.block.chunkPool.Put() is called. +func (r *bucketChunkReader) save(b []byte) ([]byte, error) { // Ensure we never grow slab beyond original capacity. if len(r.chunkBytes) == 0 || cap(*r.chunkBytes[len(r.chunkBytes)-1])-len(*r.chunkBytes[len(r.chunkBytes)-1]) < len(b) { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index ecad9567fd9..f165a6785de 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -20,6 +20,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/go-kit/kit/log" "github.com/gogo/protobuf/proto" @@ -1053,6 +1054,7 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in }, nil) testutil.Ok(t, err) testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) return id } @@ -2328,7 +2330,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck head, _ := storetestutil.CreateHeadWithSeries(b, 0, storetestutil.HeadGenOptions{ TSDBDir: filepath.Join(tmpDir, "head"), SamplesPerSeries: 86400 / 15, // Simulate 1 day block with 15s scrape interval. - ScrapeInterval: 15 * 1000, + ScrapeInterval: 15 * time.Second, Series: 1000, PrependLabels: nil, Random: rand.New(rand.NewSource(120)), @@ -2349,7 +2351,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) if resolutionLevel > 0 { - // Downsample newly-created block + // Downsample newly-created block. blockID, err = downsample.Downsample(logger, blockMeta, head, tmpDir, int64(resolutionLevel)) testutil.Ok(b, err) blockMeta, err = metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) @@ -2412,6 +2414,8 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet } matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) + // TODO FIXME! testutil.Ok calls b.Fatalf under the hood, which + // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) indexReader := blk.indexReader(ctx) diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index 46faece2a62..cb9305cfaaa 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -13,6 +13,7 @@ import ( "runtime" "sort" "testing" + "time" "github.com/gogo/protobuf/types" "github.com/prometheus/prometheus/pkg/labels" @@ -39,8 +40,9 @@ func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { } type HeadGenOptions struct { - TSDBDir string - SamplesPerSeries, Series, ScrapeInterval int + TSDBDir string + SamplesPerSeries, Series int + ScrapeInterval time.Duration WithWAL bool PrependLabels labels.Labels @@ -58,14 +60,14 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, t.Fatal("samples and series has to be 1 or more") } if opts.ScrapeInterval == 0 { - opts.ScrapeInterval = 1 + opts.ScrapeInterval = 1 * time.Millisecond } fmt.Printf( - "Creating %d %d-sample series with %d ms interval in %s\n", + "Creating %d %d-sample series with %s interval in %s\n", opts.Series, opts.SamplesPerSeries, - opts.ScrapeInterval, + opts.ScrapeInterval.String(), opts.TSDBDir, ) @@ -88,13 +90,13 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, tsLabel := j*opts.Series*opts.SamplesPerSeries + i*opts.SamplesPerSeries ref, err := app.Add( labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)), - int64(tsLabel*opts.ScrapeInterval), + int64(tsLabel)*opts.ScrapeInterval.Milliseconds(), opts.Random.Float64(), ) testutil.Ok(t, err) for is := 1; is < opts.SamplesPerSeries; is++ { - testutil.Ok(t, app.AddFast(ref, int64((tsLabel+is)*opts.ScrapeInterval), opts.Random.Float64())) + testutil.Ok(t, app.AddFast(ref, int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), opts.Random.Float64())) } } testutil.Ok(t, app.Commit())