From 61b000ee0e6a6095ef194e18f47a42ebb51c2acb Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 25 Sep 2018 19:18:33 +0530 Subject: [PATCH] Fix review comments Signed-off-by: Ganesh Vernekar --- checkpoint.go | 23 +----------- checkpoint_test.go | 7 ++-- head.go | 88 ++++++++++++++++++++++++++++++++++++---------- wal/wal.go | 23 ++++++++---- 4 files changed, 91 insertions(+), 50 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index 5559452a..b8de5d14 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -23,10 +23,7 @@ import ( "strconv" "strings" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/wal" ) @@ -101,12 +98,7 @@ const checkpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -// -// Non-critical errors are logged and not returned. -func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64, checkpointDeleteFail prometheus.Counter) (*CheckpointStats, error) { - if logger == nil { - logger = log.NewNopLogger() - } +func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sr io.Reader @@ -273,18 +265,5 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo if err := closeAll(closers...); err != nil { return stats, errors.Wrap(err, "close opened files") } - if err := w.Truncate(n + 1); err != nil { - // If truncating fails, we'll just try again at the next checkpoint. - // Leftover segments will just be ignored in the future if there's a checkpoint - // that supersedes them. - level.Error(logger).Log("msg", "truncating segments failed", "err", err) - } - if err := DeleteCheckpoints(w.Dir(), n); err != nil { - // Leftover old checkpoints do not cause problems down the line beyond - // occupying disk space. - // They will just be ignored since a higher checkpoint exists. - level.Error(logger).Log("msg", "delete old checkpoints", "err", err) - checkpointDeleteFail.Add(float64(1)) - } return stats, nil } diff --git a/checkpoint_test.go b/checkpoint_test.go index 97130c29..60f99fd7 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -20,7 +20,6 @@ import ( "path/filepath" "testing" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" @@ -138,10 +137,12 @@ func TestCheckpoint(t *testing.T) { } testutil.Ok(t, w.Close()) - _, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { + _, err = Checkpoint(w, 100, 106, func(x uint64) bool { return x%2 == 0 - }, last/2, prometheus.NewCounter(prometheus.CounterOpts{})) + }, last/2) testutil.Ok(t, err) + testutil.Ok(t, w.Truncate(107)) + testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106)) // Only the new checkpoint should be left. files, err := fileutil.ReadDir(dir) diff --git a/head.go b/head.go index e5afccff..8d259fd6 100644 --- a/head.go +++ b/head.go @@ -76,20 +76,25 @@ type Head struct { } type headMetrics struct { - activeAppenders prometheus.Gauge - series prometheus.Gauge - seriesCreated prometheus.Counter - seriesRemoved prometheus.Counter - seriesNotFound prometheus.Counter - chunks prometheus.Gauge - chunksCreated prometheus.Counter - chunksRemoved prometheus.Counter - gcDuration prometheus.Summary - minTime prometheus.GaugeFunc - maxTime prometheus.GaugeFunc - samplesAppended prometheus.Counter - walTruncateDuration prometheus.Summary - checkpointDeleteFail prometheus.Counter + activeAppenders prometheus.Gauge + series prometheus.Gauge + seriesCreated prometheus.Counter + seriesRemoved prometheus.Counter + seriesNotFound prometheus.Counter + chunks prometheus.Gauge + chunksCreated prometheus.Counter + chunksRemoved prometheus.Counter + gcDuration prometheus.Summary + minTime prometheus.GaugeFunc + maxTime prometheus.GaugeFunc + samplesAppended prometheus.Counter + walTruncateDuration prometheus.Summary + headTruncateFail prometheus.Counter + headTruncateTotal prometheus.Counter + checkpointDeleteFail prometheus.Counter + checkpointDeleteTotal prometheus.Counter + checkpointCreationFail prometheus.Counter + checkpointCreationTotal prometheus.Counter } func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { @@ -151,9 +156,29 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", }) + m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_truncations_failed_total", + Help: "Total number of head truncations that failed.", + }) + m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_truncations_total", + Help: "Total number of head truncations attempted.", + }) m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_checkpoint_delete_fail", - Help: "Number of times deletion of old checkpoint failed.", + Name: "prometheus_tsdb_checkpoint_deletions_failed_total", + Help: "Total number of checkpoint deletions that failed.", + }) + m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_checkpoint_deletions_total", + Help: "Total number of checkpoint deletions attempted.", + }) + m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_checkpoint_creations_failed_total", + Help: "Total number of checkpoint creations that failed.", + }) + m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_checkpoint_creations_total", + Help: "Total number of checkpoint creations attempted.", }) if r != nil { @@ -171,7 +196,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.gcDuration, m.walTruncateDuration, m.samplesAppended, + m.headTruncateFail, + m.headTruncateTotal, m.checkpointDeleteFail, + m.checkpointDeleteTotal, + m.checkpointCreationFail, + m.checkpointCreationTotal, ) } return m @@ -427,7 +457,12 @@ func (h *Head) Init() error { } // Truncate removes old data before mint from the head. -func (h *Head) Truncate(mint int64) error { +func (h *Head) Truncate(mint int64) (err error) { + defer func() { + if err != nil { + h.metrics.headTruncateFail.Inc() + } + }() initialize := h.MinTime() == math.MaxInt64 if h.MinTime() >= mint && !initialize { @@ -446,6 +481,7 @@ func (h *Head) Truncate(mint int64) error { return nil } + h.metrics.headTruncateTotal.Inc() start := time.Now() h.gc() @@ -475,9 +511,25 @@ func (h *Head) Truncate(mint int64) error { keep := func(id uint64) bool { return h.series.getByID(id) != nil } - if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint, h.metrics.checkpointDeleteFail); err != nil { + h.metrics.checkpointCreationTotal.Inc() + if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil { + h.metrics.checkpointCreationFail.Inc() return errors.Wrap(err, "create checkpoint") } + if err := h.wal.Truncate(n + 1); err != nil { + // If truncating fails, we'll just try again at the next checkpoint. + // Leftover segments will just be ignored in the future if there's a checkpoint + // that supersedes them. + level.Error(h.logger).Log("msg", "truncating segments failed", "err", err) + } + h.metrics.checkpointDeleteTotal.Inc() + if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. + // They will just be ignored since a higher checkpoint exists. + level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err) + h.metrics.checkpointDeleteFail.Inc() + } h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) level.Info(h.logger).Log("msg", "WAL checkpoint complete", diff --git a/wal/wal.go b/wal/wal.go index ead1d546..d9a59c00 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -163,6 +163,7 @@ type WAL struct { pageFlushes prometheus.Counter pageCompletions prometheus.Counter truncateFail prometheus.Counter + truncateTotal prometheus.Counter } // New returns a new WAL over the given directory. @@ -203,11 +204,15 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi Help: "Total number of completed pages.", }) w.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_truncate_fail", - Help: "Number of times WAL truncation failed.", + Name: "prometheus_tsdb_wal_truncations_failed_total", + Help: "Total number of WAL truncations that failed.", + }) + w.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_truncations_total", + Help: "Total number of WAL truncations attempted.", }) if reg != nil { - reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail) + reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal) } _, j, err := w.Segments() @@ -532,18 +537,22 @@ func (w *WAL) Segments() (m, n int, err error) { } // Truncate drops all segments before i. -func (w *WAL) Truncate(i int) error { +func (w *WAL) Truncate(i int) (err error) { + w.truncateTotal.Inc() + defer func() { + if err != nil { + w.truncateFail.Inc() + } + }() refs, err := listSegments(w.dir) if err != nil { - w.truncateFail.Add(float64(1)) return err } for _, r := range refs { if r.n >= i { break } - if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil { - w.truncateFail.Add(float64(1)) + if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil { return err } }