Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome committed Sep 26, 2018
1 parent 632dfb3 commit 80d6431
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 34 deletions.
23 changes: 1 addition & 22 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ import (
"strconv"
"strings"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal"
)
Expand Down Expand Up @@ -101,12 +98,7 @@ const checkpointPrefix = "checkpoint."
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
//
// Non-critical errors are logged and not returned.
func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64, checkpointDeleteFail prometheus.Counter) (*CheckpointStats, error) {
if logger == nil {
logger = log.NewNopLogger()
}
func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{}

var sr io.Reader
Expand Down Expand Up @@ -273,18 +265,5 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
if err := closeAll(closers...); err != nil {
return stats, errors.Wrap(err, "close opened files")
}
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(w.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(logger).Log("msg", "delete old checkpoints", "err", err)
checkpointDeleteFail.Add(float64(1))
}
return stats, nil
}
7 changes: 4 additions & 3 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"path/filepath"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
Expand Down Expand Up @@ -138,10 +137,12 @@ func TestCheckpoint(t *testing.T) {
}
testutil.Ok(t, w.Close())

_, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool {
_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2, prometheus.NewCounter(prometheus.CounterOpts{}))
}, last/2)
testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))

// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
Expand Down
26 changes: 23 additions & 3 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type headMetrics struct {
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
checkpointDeleteFail prometheus.Counter
checkpointsFail prometheus.Counter
}

func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Expand Down Expand Up @@ -152,8 +153,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Help: "Total number of appended samples.",
})
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_delete_fail",
Help: "Number of times deletion of old checkpoint failed.",
Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
})
m.checkpointsFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoints_failed_total",
Help: "Total number of checkpoints that failed.",
})

if r != nil {
Expand All @@ -172,6 +177,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.walTruncateDuration,
m.samplesAppended,
m.checkpointDeleteFail,
m.checkpointsFail,
)
}
return m
Expand Down Expand Up @@ -475,9 +481,23 @@ func (h *Head) Truncate(mint int64) error {
keep := func(id uint64) bool {
return h.series.getByID(id) != nil
}
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint, h.metrics.checkpointDeleteFail); err != nil {
if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil {
h.metrics.checkpointsFail.Inc()
return errors.Wrap(err, "create checkpoint")
}
if err := h.wal.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
h.metrics.checkpointDeleteFail.Inc()
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())

level.Info(h.logger).Log("msg", "WAL checkpoint complete",
Expand Down
15 changes: 9 additions & 6 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
Help: "Total number of completed pages.",
})
w.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_truncate_fail",
Help: "Number of times WAL truncation failed.",
Name: "prometheus_tsdb_wal_truncate_failed_total",
Help: "Total number of WAL truncations that failed.",
})
if reg != nil {
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail)
Expand Down Expand Up @@ -532,18 +532,21 @@ func (w *WAL) Segments() (m, n int, err error) {
}

// Truncate drops all segments before i.
func (w *WAL) Truncate(i int) error {
func (w *WAL) Truncate(i int) (err error) {
defer func() {
if err != nil {
w.truncateFail.Inc()
}
}()
refs, err := listSegments(w.dir)
if err != nil {
w.truncateFail.Add(float64(1))
return err
}
for _, r := range refs {
if r.n >= i {
break
}
if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil {
w.truncateFail.Add(float64(1))
if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil {
return err
}
}
Expand Down

0 comments on commit 80d6431

Please sign in to comment.