Skip to content

Commit

Permalink
Merge pull request #4 from krasi-georgiev/pull/374-review
Browse files Browse the repository at this point in the history
Pull/374 review
  • Loading branch information
codesome authored Jan 16, 2019
2 parents 2378d2b + e0bb757 commit efa23ce
Show file tree
Hide file tree
Showing 34 changed files with 1,101 additions and 756 deletions.
11 changes: 9 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@
sudo: required
dist: trusty
language: go
os:
- windows
- linux
- osx

go:
- 1.10.x
- 1.11.x

go_import_path: github.com/prometheus/tsdb

before_install:
- if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then choco install make; fi

install:
- go get -v -t ./...
- make deps

script:
# `staticcheck` target is omitted due to linting errors
- make check_license style unused test
- if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make; fi
23 changes: 19 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
## master / unreleased
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.

- `LastCheckpoint` used to return just the segment name and now it returns the full relative path.
- `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct.
- `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors.
- \[CHANGE\] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.

## 0.3.0
- [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path.
- [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct.
- [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors.
- [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)`
- [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field.
- [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset.
-[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ TSDB_BENCHMARK_NUM_METRICS ?= 1000
TSDB_BENCHMARK_DATASET ?= "$(TSDB_PROJECT_DIR)/testdata/20kseries.json"
TSDB_BENCHMARK_OUTPUT_DIR ?= "$(TSDB_CLI_DIR)/benchout"

STATICCHECK_IGNORE =
include Makefile.common

.PHONY: deps
deps:
@echo ">> getting dependencies"
GO111MODULE=$(GO111MODULE) $(GO) get $(GOOPTS) -t ./...

build:
@$(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR)
GO111MODULE=$(GO111MODULE) $(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR)

bench: build
@echo ">> running benchmark, writing result to $(TSDB_BENCHMARK_OUTPUT_DIR)"
Expand Down
38 changes: 36 additions & 2 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"path/filepath"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
Expand Down Expand Up @@ -140,6 +142,12 @@ type Appendable interface {
Appender() Appender
}

// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}

// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
Expand All @@ -166,6 +174,7 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
}

// BlockDesc describes a block by ULID and time range.
Expand Down Expand Up @@ -260,7 +269,10 @@ type Block struct {

// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) {
if logger == nil {
logger = log.NewNopLogger()
}
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
Expand All @@ -275,11 +287,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return nil, err
}

tr, err := readTombstones(dir)
tr, tsr, err := readTombstones(dir)
if err != nil {
return nil, err
}

// TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}

pb := &Block{
dir: dir,
meta: *meta,
Expand All @@ -291,6 +312,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return pb, nil
}

func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}

// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
Expand Down Expand Up @@ -318,6 +349,9 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }

// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }

// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")

Expand Down
55 changes: 16 additions & 39 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"

"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
)
Expand All @@ -46,60 +45,40 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)

b := createEmptyBlock(t, tmpdir, &BlockMeta{Version: 2})

blockDir := createBlock(t, tmpdir, 0, 0, 0)
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
testutil.Ok(t, b.setCompactionFailed())
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())

b, err = OpenBlock(tmpdir, nil)
b, err = OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())
}

// createEmpty block creates a block with the given meta but without any data.
func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
testutil.Ok(t, os.MkdirAll(dir, 0777))

testutil.Ok(t, writeMetaFile(dir, meta))

ir, err := index.NewWriter(filepath.Join(dir, indexFilename))
testutil.Ok(t, err)
testutil.Ok(t, ir.Close())

testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777))

testutil.Ok(t, writeTombstoneFile(dir, newMemTombstones()))

b, err := OpenBlock(dir, nil)
testutil.Ok(t, err)
return b
}

// createPopulatedBlock creates a block with nSeries series, and nSamples samples.
func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Block {
// createBlock creates a block with nSeries series, filled with
// samples of the given mint,maxt time range and returns its dir.
func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) string {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
testutil.Ok(tb, err)
defer head.Close()

lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries)
testutil.Ok(tb, err)
refs := make([]uint64, nSeries)
var ref uint64

for n := 0; n < nSamples; n++ {
for ts := mint; ts <= maxt; ts++ {
app := head.Appender()
ts := n * 1000
for i, lbl := range lbls {
if refs[i] != 0 {
err := app.AddFast(refs[i], int64(ts), rand.Float64())
if err == nil {
continue
}
for _, lbl := range lbls {
err := app.AddFast(ref, ts, rand.Float64())
if err == nil {
continue
}
ref, err := app.Add(lbl, int64(ts), rand.Float64())
ref, err = app.Add(lbl, int64(ts), rand.Float64())
testutil.Ok(tb, err)
refs[i] = ref
}
err := app.Commit()
testutil.Ok(tb, err)
Expand All @@ -113,7 +92,5 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil)
testutil.Ok(tb, err)

blk, err := OpenBlock(filepath.Join(dir, ulid.String()), nil)
testutil.Ok(tb, err)
return blk
return filepath.Join(dir, ulid.String())
}
10 changes: 8 additions & 2 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type CheckpointStats struct {
DroppedSamples int
DroppedTombstones int
TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples inlcuding dropped ones.
TotalSamples int // Processed samples including dropped ones.
TotalTombstones int // Processed tombstones including dropped ones.
}

Expand Down Expand Up @@ -128,7 +128,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
defer sgmReader.Close()
}

cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to))
cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to))
cpdirtmp := cpdir + ".tmp"

if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
Expand All @@ -139,6 +139,12 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
return nil, errors.Wrap(err, "open checkpoint")
}

// Ensures that an early return caused by an error doesn't leave any tmp files.
defer func() {
cp.Close()
os.RemoveAll(cpdirtmp)
}()

r := wal.NewReader(sgmReader)

var (
Expand Down
34 changes: 32 additions & 2 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package tsdb

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"

"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
Expand All @@ -31,11 +34,11 @@ func TestLastCheckpoint(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(dir)

s, k, err := LastCheckpoint(dir)
_, _, err = LastCheckpoint(dir)
testutil.Equals(t, ErrNotFound, err)

testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777))
s, k, err = LastCheckpoint(dir)
s, k, err := LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, filepath.Join(dir, "checkpoint.0000"), s)
testutil.Equals(t, 0, k)
Expand Down Expand Up @@ -180,3 +183,30 @@ func TestCheckpoint(t *testing.T) {
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
}

func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Create a new wal with an invalid records.
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := wal.NewSize(nil, nil, dir, 64*1024)
testutil.Ok(t, err)
testutil.Ok(t, w.Log([]byte{99}))
w.Close()

// Run the checkpoint and since the wal contains an invalid records this should return an error.
_, err = Checkpoint(w, 0, 1, nil, 0)
testutil.NotOk(t, err)

// Walk the wal dir to make sure there are no tmp folder left behind after the error.
err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error {
if err != nil {
return errors.Wrapf(err, "access err %q: %v\n", path, err)
}
if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") {
return fmt.Errorf("wal dir contains temporary folder:%s", info.Name())
}
return nil
})
testutil.Ok(t, err)
}
Loading

0 comments on commit efa23ce

Please sign in to comment.