Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome committed Sep 28, 2018
1 parent 632dfb3 commit 61b000e
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 50 deletions.
23 changes: 1 addition & 22 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ import (
"strconv"
"strings"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal"
)
Expand Down Expand Up @@ -101,12 +98,7 @@ const checkpointPrefix = "checkpoint."
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
//
// Non-critical errors are logged and not returned.
func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64, checkpointDeleteFail prometheus.Counter) (*CheckpointStats, error) {
if logger == nil {
logger = log.NewNopLogger()
}
func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{}

var sr io.Reader
Expand Down Expand Up @@ -273,18 +265,5 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
if err := closeAll(closers...); err != nil {
return stats, errors.Wrap(err, "close opened files")
}
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(w.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(logger).Log("msg", "delete old checkpoints", "err", err)
checkpointDeleteFail.Add(float64(1))
}
return stats, nil
}
7 changes: 4 additions & 3 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"path/filepath"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
Expand Down Expand Up @@ -138,10 +137,12 @@ func TestCheckpoint(t *testing.T) {
}
testutil.Ok(t, w.Close())

_, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool {
_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2, prometheus.NewCounter(prometheus.CounterOpts{}))
}, last/2)
testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))

// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
Expand Down
88 changes: 70 additions & 18 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,25 @@ type Head struct {
}

type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.Gauge
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
chunks prometheus.Gauge
chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter
gcDuration prometheus.Summary
minTime prometheus.GaugeFunc
maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
checkpointDeleteFail prometheus.Counter
activeAppenders prometheus.Gauge
series prometheus.Gauge
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
chunks prometheus.Gauge
chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter
gcDuration prometheus.Summary
minTime prometheus.GaugeFunc
maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
headTruncateFail prometheus.Counter
headTruncateTotal prometheus.Counter
checkpointDeleteFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
}

func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Expand Down Expand Up @@ -151,9 +156,29 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.",
})
m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.",
})
m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_total",
Help: "Total number of head truncations attempted.",
})
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_delete_fail",
Help: "Number of times deletion of old checkpoint failed.",
Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
})
m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
})
m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
})
m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
})

if r != nil {
Expand All @@ -171,7 +196,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.gcDuration,
m.walTruncateDuration,
m.samplesAppended,
m.headTruncateFail,
m.headTruncateTotal,
m.checkpointDeleteFail,
m.checkpointDeleteTotal,
m.checkpointCreationFail,
m.checkpointCreationTotal,
)
}
return m
Expand Down Expand Up @@ -427,7 +457,12 @@ func (h *Head) Init() error {
}

// Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) error {
func (h *Head) Truncate(mint int64) (err error) {
defer func() {
if err != nil {
h.metrics.headTruncateFail.Inc()
}
}()
initialize := h.MinTime() == math.MaxInt64

if h.MinTime() >= mint && !initialize {
Expand All @@ -446,6 +481,7 @@ func (h *Head) Truncate(mint int64) error {
return nil
}

h.metrics.headTruncateTotal.Inc()
start := time.Now()

h.gc()
Expand Down Expand Up @@ -475,9 +511,25 @@ func (h *Head) Truncate(mint int64) error {
keep := func(id uint64) bool {
return h.series.getByID(id) != nil
}
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint, h.metrics.checkpointDeleteFail); err != nil {
h.metrics.checkpointCreationTotal.Inc()
if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil {
h.metrics.checkpointCreationFail.Inc()
return errors.Wrap(err, "create checkpoint")
}
if err := h.wal.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}
h.metrics.checkpointDeleteTotal.Inc()
if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
h.metrics.checkpointDeleteFail.Inc()
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())

level.Info(h.logger).Log("msg", "WAL checkpoint complete",
Expand Down
23 changes: 16 additions & 7 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type WAL struct {
pageFlushes prometheus.Counter
pageCompletions prometheus.Counter
truncateFail prometheus.Counter
truncateTotal prometheus.Counter
}

// New returns a new WAL over the given directory.
Expand Down Expand Up @@ -203,11 +204,15 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
Help: "Total number of completed pages.",
})
w.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_truncate_fail",
Help: "Number of times WAL truncation failed.",
Name: "prometheus_tsdb_wal_truncations_failed_total",
Help: "Total number of WAL truncations that failed.",
})
w.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_truncations_total",
Help: "Total number of WAL truncations attempted.",
})
if reg != nil {
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail)
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal)
}

_, j, err := w.Segments()
Expand Down Expand Up @@ -532,18 +537,22 @@ func (w *WAL) Segments() (m, n int, err error) {
}

// Truncate drops all segments before i.
func (w *WAL) Truncate(i int) error {
func (w *WAL) Truncate(i int) (err error) {
w.truncateTotal.Inc()
defer func() {
if err != nil {
w.truncateFail.Inc()
}
}()
refs, err := listSegments(w.dir)
if err != nil {
w.truncateFail.Add(float64(1))
return err
}
for _, r := range refs {
if r.n >= i {
break
}
if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil {
w.truncateFail.Add(float64(1))
if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil {
return err
}
}
Expand Down

0 comments on commit 61b000e

Please sign in to comment.