Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Merge branch 'master' into remove-PrefixMatcher
Browse files Browse the repository at this point in the history
Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
Krasi Georgiev committed Dec 28, 2018
2 parents 1a8f9df + 6d489a1 commit 5053fd0
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 25 deletions.
9 changes: 8 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@
sudo: required
dist: trusty
language: go
os:
- windows
- linux
- osx

go:
- 1.10.x
- 1.11.x

go_import_path: github.com/prometheus/tsdb

before_install:
- if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then choco install make; fi

install:
- go get -v -t ./...

script:
# `staticcheck` target is omitted due to linting errors
- make check_license style unused test
- if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make check_license style unused test; fi
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
## master / unreleased

- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.

## 0.3.0
## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.

## 0.3.0
- [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path.
- [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct.
- [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors.
Expand Down
10 changes: 9 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
// millisecond precision timestamps.
var DefaultOptions = &Options{
WALFlushInterval: 5 * time.Second,
WALSegmentSize: wal.DefaultSegmentSize,
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
NoLockfile: false,
Expand All @@ -55,6 +56,9 @@ type Options struct {
// The interval at which the write ahead log is flushed to disk.
WALFlushInterval time.Duration

// Segments (wal files) max size
WALSegmentSize int

// Duration of persisted data to keep.
RetentionDuration uint64

Expand Down Expand Up @@ -263,7 +267,11 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
return nil, errors.Wrap(err, "create leveled compactor")
}

wlog, err := wal.New(l, r, filepath.Join(dir, "wal"))
segmentSize := wal.DefaultSegmentSize
if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize
}
wlog, err := wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
if err != nil {
return nil, err
}
Expand Down
26 changes: 26 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,32 @@ func TestWALFlushedOnDBClose(t *testing.T) {
testutil.Equals(t, []string{"labelvalue"}, values)
}

func TestWALSegmentSizeOption(t *testing.T) {
options := *DefaultOptions
options.WALSegmentSize = 2 * 32 * 1024
db, close := openTestDB(t, &options)
defer close()
app := db.Appender()
for i := int64(0); i < 155; i++ {
_, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64())
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}

dbDir := db.Dir()
db.Close()
files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.")
testutil.Ok(t, err)
for i, f := range files {
if len(files)-1 != i {
testutil.Equals(t, int64(options.WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
continue
}
testutil.Assert(t, int64(options.WALSegmentSize) > f.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", f.Name())
}
}

func TestTombstoneClean(t *testing.T) {
numSamples := int64(10)

Expand Down
15 changes: 10 additions & 5 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type headMetrics struct {
maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter
headTruncateFail prometheus.Counter
headTruncateTotal prometheus.Counter
checkpointDeleteFail prometheus.Counter
Expand Down Expand Up @@ -152,6 +153,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
})
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_corruptions_total",
Help: "Total number of WAL corruptions.",
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.",
Expand Down Expand Up @@ -195,6 +200,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.maxTime,
m.gcDuration,
m.walTruncateDuration,
m.walCorruptionsTotal,
m.samplesAppended,
m.headTruncateFail,
m.headTruncateTotal,
Expand Down Expand Up @@ -473,18 +479,17 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil {
return errors.Wrap(err, "open WAL segments")
}
defer sr.Close()

err = h.loadWAL(wal.NewReader(sr))
sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows.
if err == nil {
return nil
}
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)

h.metrics.walCorruptionsTotal.Inc()
if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
}

return nil
}

Expand All @@ -501,7 +506,7 @@ func (h *Head) Truncate(mint int64) (err error) {
return nil
}
atomic.StoreInt64(&h.minTime, mint)
h.minValidTime = mint
atomic.StoreInt64(&h.minValidTime, mint)

// Ensure that max time is at least as high as min time.
for h.MaxTime() < mint {
Expand Down Expand Up @@ -657,7 +662,7 @@ func (h *Head) appender() *headAppender {
head: h,
// Set the minimum valid time to whichever is greater the head min valid time or the compaciton window.
// This ensures that no samples will be added within the compaction window to avoid races.
minValidTime: max(h.minValidTime, h.MaxTime()-h.chunkRange/2),
minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2),
mint: math.MaxInt64,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
Expand Down
12 changes: 9 additions & 3 deletions head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sort"
"testing"

prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
Expand Down Expand Up @@ -118,11 +119,11 @@ func TestHead_ReadWAL(t *testing.T) {

w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
defer w.Close()
populateTestWAL(t, w, entries)

head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
defer head.Close()

testutil.Ok(t, head.Init(math.MinInt64))
testutil.Equals(t, uint64(100), head.lastSeriesID)
Expand Down Expand Up @@ -282,11 +283,11 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {

w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
defer w.Close()
populateTestWAL(t, w, entries)

head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
defer head.Close()

testutil.Ok(t, head.Init(math.MinInt64))

Expand Down Expand Up @@ -389,6 +390,7 @@ Outer:
func TestDeleteUntilCurMax(t *testing.T) {
numSamples := int64(10)
hb, err := NewHead(nil, nil, nil, 1000000)
defer hb.Close()
testutil.Ok(t, err)
app := hb.Appender()
smpls := make([]float64, numSamples)
Expand Down Expand Up @@ -478,6 +480,7 @@ func TestDelete_e2e(t *testing.T) {
defer os.RemoveAll(dir)
hb, err := NewHead(nil, nil, nil, 100000)
testutil.Ok(t, err)
defer hb.Close()
app := hb.Appender()
for _, l := range lbls {
ls := labels.New(l...)
Expand Down Expand Up @@ -845,6 +848,7 @@ func TestHead_LogRollback(t *testing.T) {

w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
defer w.Close()
h, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)

Expand Down Expand Up @@ -911,6 +915,7 @@ func TestWalRepair(t *testing.T) {

w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
defer w.Close()

for i := 1; i <= test.totalRecs; i++ {
// At this point insert a corrupted record.
Expand All @@ -923,7 +928,9 @@ func TestWalRepair(t *testing.T) {

h, err := NewHead(nil, nil, w, 1)
testutil.Ok(t, err)
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
testutil.Ok(t, h.Init(math.MinInt64))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))

sr, err := wal.NewSegmentsReader(dir)
testutil.Ok(t, err)
Expand All @@ -936,7 +943,6 @@ func TestWalRepair(t *testing.T) {
}
testutil.Ok(t, r.Err())
testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records")

})
}
}
10 changes: 5 additions & 5 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

const (
defaultSegmentSize = 128 * 1024 * 1024 // 128 MB
DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB
pageSize = 32 * 1024 // 32KB
recordHeaderSize = 7
)
Expand Down Expand Up @@ -174,7 +174,7 @@ type WAL struct {

// New returns a new WAL over the given directory.
func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) {
return NewSize(logger, reg, dir, defaultSegmentSize)
return NewSize(logger, reg, dir, DefaultSegmentSize)
}

// NewSize returns a new WAL over the given directory.
Expand Down Expand Up @@ -298,9 +298,6 @@ func (w *WAL) Repair(origErr error) error {
level.Warn(w.logger).Log("msg", "deleting all segments behind corruption", "segment", cerr.Segment)

for _, s := range segs {
if s.index <= cerr.Segment {
continue
}
if w.segment.i == s.index {
// The active segment needs to be removed,
// close it first (Windows!). Can be closed safely
Expand All @@ -310,6 +307,9 @@ func (w *WAL) Repair(origErr error) error {
return errors.Wrap(err, "close active segment")
}
}
if s.index <= cerr.Segment {
continue
}
if err := os.Remove(filepath.Join(w.dir, s.name)); err != nil {
return errors.Wrapf(err, "delete segment:%v", s.index)
}
Expand Down
9 changes: 1 addition & 8 deletions wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"
"testing"

"github.com/pkg/errors"
"github.com/prometheus/tsdb/testutil"
)

Expand Down Expand Up @@ -336,14 +335,8 @@ func TestWAL_Repair(t *testing.T) {
}
testutil.NotOk(t, r.Err())
testutil.Ok(t, sr.Close())
testutil.Ok(t, w.Repair(r.Err()))

// See https://github.com/prometheus/prometheus/issues/4603
// We need to close w.segment because it needs to be deleted.
// But this is to mainly artificially test Repair() again.
testutil.Ok(t, w.segment.Close())
testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err")))

testutil.Ok(t, w.Repair(r.Err()))
sr, err = NewSegmentsReader(dir)
testutil.Ok(t, err)
r = NewReader(sr)
Expand Down
2 changes: 2 additions & 0 deletions wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows

package tsdb

import (
Expand Down

0 comments on commit 5053fd0

Please sign in to comment.