diff --git a/checkpoint.go b/checkpoint.go index b8de5d14..74c539af 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -23,7 +23,10 @@ import ( "strconv" "strings" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/wal" ) @@ -98,7 +101,9 @@ const checkpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { +// +// Non-critical errors are logged and not returned. +func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64, checkpointDeleteTotal, checkpointDeleteFail prometheus.Counter) (*CheckpointStats, error) { stats := &CheckpointStats{} var sr io.Reader @@ -265,5 +270,23 @@ func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*C if err := closeAll(closers...); err != nil { return stats, errors.Wrap(err, "close opened files") } + if err := w.Truncate(n + 1); err != nil { + // If truncating fails, we'll just try again at the next checkpoint. + // Leftover segments will just be ignored in the future if there's a checkpoint + // that supersedes them. + level.Error(logger).Log("msg", "truncating segments failed", "err", err) + } + if checkpointDeleteTotal != nil { + checkpointDeleteTotal.Inc() + } + if err := DeleteCheckpoints(w.Dir(), n); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. + // They will just be ignored since a higher checkpoint exists. + level.Error(logger).Log("msg", "delete old checkpoints", "err", err) + if checkpointDeleteFail != nil { + checkpointDeleteFail.Inc() + } + } return stats, nil } diff --git a/checkpoint_test.go b/checkpoint_test.go index 60f99fd7..4cb4c5a6 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -137,9 +137,9 @@ func TestCheckpoint(t *testing.T) { } testutil.Ok(t, w.Close()) - _, err = Checkpoint(w, 100, 106, func(x uint64) bool { + _, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { return x%2 == 0 - }, last/2) + }, last/2, nil, nil) testutil.Ok(t, err) testutil.Ok(t, w.Truncate(107)) testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106)) diff --git a/head.go b/head.go index 8d259fd6..a2651640 100644 --- a/head.go +++ b/head.go @@ -512,24 +512,13 @@ func (h *Head) Truncate(mint int64) (err error) { return h.series.getByID(id) != nil } h.metrics.checkpointCreationTotal.Inc() - if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil { + _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint, + h.metrics.checkpointDeleteTotal, h.metrics.checkpointDeleteFail) + if err != nil { h.metrics.checkpointCreationFail.Inc() return errors.Wrap(err, "create checkpoint") } - if err := h.wal.Truncate(n + 1); err != nil { - // If truncating fails, we'll just try again at the next checkpoint. - // Leftover segments will just be ignored in the future if there's a checkpoint - // that supersedes them. - level.Error(h.logger).Log("msg", "truncating segments failed", "err", err) - } - h.metrics.checkpointDeleteTotal.Inc() - if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil { - // Leftover old checkpoints do not cause problems down the line beyond - // occupying disk space. - // They will just be ignored since a higher checkpoint exists. - level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err) - h.metrics.checkpointDeleteFail.Inc() - } + h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) level.Info(h.logger).Log("msg", "WAL checkpoint complete",