Skip to content

Commit

Permalink
downsampling: simplify StreamedBlockWriter interface
Browse files Browse the repository at this point in the history
Reduce of use public Flush method to finalize index and meta files.
In case of error, a caller has to remove block directory with a preserved
garbage inside.

Rid of use tmp directories and renaming, syncing the final block on disk
before upload.
  • Loading branch information
xjewer committed Feb 6, 2019
1 parent ff9c776 commit fb50a78
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 194 deletions.
7 changes: 6 additions & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const (
MetaFilename = "meta.json"
)

const (
// MetaVersion is a enumeration of versions supported by Thanos.
MetaVersion1 = iota + 1
)

// Meta describes the a block's meta. It wraps the known TSDB meta structure and
// extends it by Thanos-specific fields.
type Meta struct {
Expand Down Expand Up @@ -135,7 +140,7 @@ func Read(dir string) (*Meta, error) {
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if m.Version != 1 {
if m.Version != MetaVersion1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
return &m, nil
Expand Down
48 changes: 36 additions & 12 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package downsample

import (
"math"
"math/rand"
"os"
"path/filepath"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -47,9 +51,33 @@ func Downsample(
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader")

// Generate new block id.
uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))

// Create block directory to populate with chunks, meta and index files into.
blockDir := filepath.Join(dir, uid.String())
if err := os.MkdirAll(blockDir, 0777); err != nil {
return id, errors.Wrap(err, "mkdir block dir")
}

// Remove blockDir in case of errors.
defer func() {
if err != nil {
var merr tsdb.MultiError
merr.Add(err)
merr.Add(os.RemoveAll(blockDir))
err = merr.Err()
}
}()

// Copy original meta to the new one. Update downsampling resolution and ULID for a new block.
newMeta := *origMeta
newMeta.Thanos.Downsample.Resolution = resolution
newMeta.ULID = uid

// Writes downsampled chunks right into the files, avoiding excess memory allocation.
// Flushes index and meta data afterwards aggregations.
streamedBlockWriter, err := NewWriter(dir, indexr, logger, *origMeta, resolution)
// Flushes index and meta data after aggregations.
streamedBlockWriter, err := NewStreamedBlockWriter(blockDir, indexr, logger, newMeta)
if err != nil {
return id, errors.Wrap(err, "get streamed block writer")
}
Expand Down Expand Up @@ -93,11 +121,11 @@ func Downsample(
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
}
if err := streamedBlockWriter.AddSeries(lset, downsampleRaw(all, resolution)); err != nil {
if err := streamedBlockWriter.WriteSeries(lset, downsampleRaw(all, resolution)); err != nil {
return id, errors.Wrapf(err, "downsample raw data, series: %d", postings.At())
}
} else {
// Downsample a block that contains aggregate chunks already.
// Downsample a block that contains aggregated chunks already.
for _, c := range chks {
aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk))
}
Expand All @@ -112,21 +140,17 @@ func Downsample(
if err != nil {
return id, errors.Wrapf(err, "downsample aggregate block, series: %d", postings.At())
}
if err := streamedBlockWriter.AddSeries(lset, downsampledChunks); err != nil {
return id, errors.Wrapf(err, "downsample aggregated block, series: %d", postings.At())
if err := streamedBlockWriter.WriteSeries(lset, downsampledChunks); err != nil {
return id, errors.Wrapf(err, "write series: %d", postings.At())
}
}
}
if postings.Err() != nil {
return id, errors.Wrap(postings.Err(), "iterate series set")
}

id, err = streamedBlockWriter.Flush()
if err != nil {
return id, errors.Wrap(err, "flush data in stream data")
}

return id, nil
id = uid
return
}

// currentWindow returns the end timestamp of the window that t falls into.
Expand Down
9 changes: 8 additions & 1 deletion pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,16 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) {
}

func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) {
return tsdb.EmptyTombstoneReader(), nil
return emptyTombstoneReader{}, nil
}

func (b *memBlock) Close() error {
return nil
}

type emptyTombstoneReader struct{}

func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil }
func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil }
func (emptyTombstoneReader) Total() uint64 { return 0 }
func (emptyTombstoneReader) Close() error { return nil }
Loading

0 comments on commit fb50a78

Please sign in to comment.