Skip to content

Commit

Permalink
Apply code review suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Kononov <[email protected]>
  • Loading branch information
krya-kryak committed Mar 23, 2021
1 parent 07978c9 commit 18d9fed
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 58 deletions.
97 changes: 48 additions & 49 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,9 +764,10 @@ func blockSeries(
s.refs = make([]uint64, 0, len(chks))
s.chks = make([]storepb.AggrChunk, 0, len(chks))
for j, meta := range chks {
// s is appended to res, but not at every iteration, hence len(res) is the index we need.
if err := chunkr.addPreload(meta.Ref, len(res), j); err != nil {
return nil, nil, errors.Wrap(err, "add chunk preload")
// seriesEntry s is appended to res, but not at every outer loop iteration,
// therefore len(res) is the index we need here, not outer loop iteration number.
if err := chunkr.addLoad(meta.Ref, len(res), j); err != nil {
return nil, nil, errors.Wrap(err, "add chunk load")
}
s.chks = append(s.chks, storepb.AggrChunk{
MinTime: meta.MinTime,
Expand All @@ -792,16 +793,16 @@ func blockSeries(
return newBucketSeriesSet(res), indexr.stats, nil
}

if err := chunkr.preload(res, req.Aggregates); err != nil {
return nil, nil, errors.Wrap(err, "preload chunks")
if err := chunkr.load(res, req.Aggregates); err != nil {
return nil, nil, errors.Wrap(err, "load chunks")
}

return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, savior func([]byte) ([]byte, error)) error {
func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error)) error {
if in.Encoding() == chunkenc.EncXOR {
b, err := savior(in.Bytes())
b, err := save(in.Bytes())
if err != nil {
return err
}
Expand All @@ -821,7 +822,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return errors.Errorf("aggregate %s does not exist", downsample.AggrCount)
}
b, err := savior(x.Bytes())
b, err := save(x.Bytes())
if err != nil {
return err
}
Expand All @@ -831,7 +832,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return errors.Errorf("aggregate %s does not exist", downsample.AggrSum)
}
b, err := savior(x.Bytes())
b, err := save(x.Bytes())
if err != nil {
return err
}
Expand All @@ -841,7 +842,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return errors.Errorf("aggregate %s does not exist", downsample.AggrMin)
}
b, err := savior(x.Bytes())
b, err := save(x.Bytes())
if err != nil {
return err
}
Expand All @@ -851,7 +852,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return errors.Errorf("aggregate %s does not exist", downsample.AggrMax)
}
b, err := savior(x.Bytes())
b, err := save(x.Bytes())
if err != nil {
return err
}
Expand All @@ -861,7 +862,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
if err != nil {
return errors.Errorf("aggregate %s does not exist", downsample.AggrCounter)
}
b, err := savior(x.Bytes())
b, err := save(x.Bytes())
if err != nil {
return err
}
Expand Down Expand Up @@ -2242,9 +2243,10 @@ func decodeSeriesForTime(b []byte, lset *[]symbolizedLabel, chks *[]chunks.Meta,
}

type preloadIdx struct {
off uint32
i int
j int
offset uint32
// Indices, not actual entries and chunks.
seriesEntry int
chunk int
}

type bucketChunkReader struct {
Expand All @@ -2267,20 +2269,7 @@ func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkR
block: block,
stats: &queryStats{},
preloads: make([][]preloadIdx, len(block.chunkObjs)),
chunks: map[uint64]chunkenc.Chunk{},
}
}

func (r *bucketChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) {
c, ok := r.chunks[id]
if !ok {
return nil, errors.Errorf("chunk with ID %d not found", id)
}

r.stats.chunksTouched++
r.stats.chunksTouchedSizeSum += len(c.Bytes())

return c, nil
}

func (r *bucketChunkReader) Close() error {
Expand All @@ -2292,29 +2281,30 @@ func (r *bucketChunkReader) Close() error {
return nil
}

// appPreload adds the chunk with id to the data set that will be fetched on calling preload.
func (r *bucketChunkReader) addPreload(id uint64, i, j int) error {
// addLoad adds the chunk with id to the data set to be fetched.
// Chunk will be fetched and saved to res[seriesEntry][chunk] upon r.load(res, <...>) call.
func (r *bucketChunkReader) addLoad(id uint64, seriesEntry, chunk int) error {
var (
seq = int(id >> 32)
off = uint32(id)
)
if seq >= len(r.preloads) {
return errors.Errorf("reference sequence %d out of range", seq)
}
r.preloads[seq] = append(r.preloads[seq], preloadIdx{off, i, j})
r.preloads[seq] = append(r.preloads[seq], preloadIdx{off, seriesEntry, chunk})
return nil
}

// preload all added chunk IDs. Must be called before the first call to Chunk is made.
func (r *bucketChunkReader) preload(res []seriesEntry, aggrs []storepb.Aggr) error {
// load loads all added chunks and saves resulting aggrs to res.
func (r *bucketChunkReader) load(res []seriesEntry, aggrs []storepb.Aggr) error {
g, ctx := errgroup.WithContext(r.ctx)

for seq, pIdxs := range r.preloads {
sort.Slice(pIdxs, func(i, j int) bool {
return pIdxs[i].off < pIdxs[j].off
return pIdxs[i].offset < pIdxs[j].offset
})
parts := r.block.partitioner.Partition(len(pIdxs), func(i int) (start, end uint64) {
return uint64(pIdxs[i].off), uint64(pIdxs[i].off) + EstimatedMaxChunkSize
return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize
})

for _, p := range parts {
Expand Down Expand Up @@ -2358,7 +2348,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a

var (
buf = make([]byte, EstimatedMaxChunkSize)
readOffset = int(pIdxs[0].off)
readOffset = int(pIdxs[0].offset)

// Save a few allocations.
written int64
Expand All @@ -2369,17 +2359,19 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a

for i, pIdx := range pIdxs {
// Fast forward range reader to the next chunk start in case of sparse (for our purposes) byte range.
for readOffset < int(pIdx.off) {
written, err = io.CopyN(ioutil.Discard, bufReader, int64(pIdx.off)-int64(readOffset))
for readOffset < int(pIdx.offset) {
written, err = io.CopyN(ioutil.Discard, bufReader, int64(pIdx.offset)-int64(readOffset))
if err != nil {
return errors.Wrap(err, "fast forward range reader")
}
readOffset += int(written)
}
// Presume chunk length.
// Presume chunk length to be reasonably large for common use cases.
// However, declaration for EstimatedMaxChunkSize warns us some chunks could be larger in some rare cases.
// This is handled further down below.
chunkLen = EstimatedMaxChunkSize
if i+1 < len(pIdxs) {
if diff = pIdxs[i+1].off - pIdx.off; int(diff) < chunkLen {
if diff = pIdxs[i+1].offset - pIdx.offset; int(diff) < chunkLen {
chunkLen = int(diff)
}
}
Expand All @@ -2389,21 +2381,23 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
if errors.Is(err, io.ErrUnexpectedEOF) && i == len(pIdxs)-1 {
// This may be a valid case.
} else if err != nil {
return errors.Wrapf(err, "read range for seq %d offset %x", seq, pIdx.off)
return errors.Wrapf(err, "read range for seq %d offset %x", seq, pIdx.offset)
}

l, n := binary.Uvarint(cb)
chunkDataLen, n := binary.Uvarint(cb)
if n < 1 {
return errors.New("reading chunk length failed")
}

// Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and l for actual chunk data.
// Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and chunkDataLen for actual chunk data.
// There is also crc32 after the chunk, but we ignore that.
chunkLen = n + 1 + int(l)
chunkLen = n + 1 + int(chunkDataLen)
if chunkLen <= len(cb) {
if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk(cb[n:chunkLen]), aggrs, r.savior); err != nil {
if err := populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save); err != nil {
return errors.Wrap(err, "populate chunk")
}
r.stats.chunksTouched++
r.stats.chunksTouchedSizeSum += int(chunkDataLen)
continue
}

Expand All @@ -2414,7 +2408,8 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
fetchBegin = time.Now()

// Read entire chunk into new buffer.
nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.off), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}})
// TODO: readChunkRange call could be avoided for any chunk but last in this particular part.
nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}})
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen)
}
Expand All @@ -2428,17 +2423,21 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
r.stats.chunksFetchCount++
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += len(*nb)

if err := populateChunk(&(res[pIdx.i].chks[pIdx.j]), rawChunk((*nb)[n:]), aggrs, r.savior); err != nil {
if err := populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save); err != nil {
r.block.chunkPool.Put(nb)
return errors.Wrap(err, "populate chunk")
}
r.stats.chunksTouched++
r.stats.chunksTouchedSizeSum += int(chunkDataLen)

r.block.chunkPool.Put(nb)
}
return nil
}

func (r *bucketChunkReader) savior(b []byte) ([]byte, error) {
// save saves a copy of b's payload to a memory pool of its own and returns a new byte slice referencing said copy.
// Returned slice becomes invalid once r.block.chunkPool.Put() is called.
func (r *bucketChunkReader) save(b []byte) ([]byte, error) {
// Ensure we never grow slab beyond original capacity.
if len(r.chunkBytes) == 0 ||
cap(*r.chunkBytes[len(r.chunkBytes)-1])-len(*r.chunkBytes[len(r.chunkBytes)-1]) < len(b) {
Expand Down
8 changes: 6 additions & 2 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -1053,6 +1054,7 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in
}, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc))
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc))

return id
}
Expand Down Expand Up @@ -2328,7 +2330,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck
head, _ := storetestutil.CreateHeadWithSeries(b, 0, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "head"),
SamplesPerSeries: 86400 / 15, // Simulate 1 day block with 15s scrape interval.
ScrapeInterval: 15 * 1000,
ScrapeInterval: 15 * time.Second,
Series: 1000,
PrependLabels: nil,
Random: rand.New(rand.NewSource(120)),
Expand All @@ -2349,7 +2351,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck
testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

if resolutionLevel > 0 {
// Downsample newly-created block
// Downsample newly-created block.
blockID, err = downsample.Downsample(logger, blockMeta, head, tmpDir, int64(resolutionLevel))
testutil.Ok(b, err)
blockMeta, err = metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String()))
Expand Down Expand Up @@ -2412,6 +2414,8 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
}

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
// TODO FIXME! testutil.Ok calls b.Fatalf under the hood, which
// must be called only from the goroutine running the Benchmark function.
testutil.Ok(b, err)

indexReader := blk.indexReader(ctx)
Expand Down
16 changes: 9 additions & 7 deletions pkg/store/storepb/testutil/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"runtime"
"sort"
"testing"
"time"

"github.com/gogo/protobuf/types"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -39,8 +40,9 @@ func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings {
}

type HeadGenOptions struct {
TSDBDir string
SamplesPerSeries, Series, ScrapeInterval int
TSDBDir string
SamplesPerSeries, Series int
ScrapeInterval time.Duration

WithWAL bool
PrependLabels labels.Labels
Expand All @@ -58,14 +60,14 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
t.Fatal("samples and series has to be 1 or more")
}
if opts.ScrapeInterval == 0 {
opts.ScrapeInterval = 1
opts.ScrapeInterval = 1 * time.Millisecond
}

fmt.Printf(
"Creating %d %d-sample series with %d ms interval in %s\n",
"Creating %d %d-sample series with %s interval in %s\n",
opts.Series,
opts.SamplesPerSeries,
opts.ScrapeInterval,
opts.ScrapeInterval.String(),
opts.TSDBDir,
)

Expand All @@ -88,13 +90,13 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
tsLabel := j*opts.Series*opts.SamplesPerSeries + i*opts.SamplesPerSeries
ref, err := app.Add(
labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix)),
int64(tsLabel*opts.ScrapeInterval),
int64(tsLabel)*opts.ScrapeInterval.Milliseconds(),
opts.Random.Float64(),
)
testutil.Ok(t, err)

for is := 1; is < opts.SamplesPerSeries; is++ {
testutil.Ok(t, app.AddFast(ref, int64((tsLabel+is)*opts.ScrapeInterval), opts.Random.Float64()))
testutil.Ok(t, app.AddFast(ref, int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), opts.Random.Float64()))
}
}
testutil.Ok(t, app.Commit())
Expand Down

0 comments on commit 18d9fed

Please sign in to comment.