Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact, receive, store: Init all labeled counter and histogram metrics #2893

Merged
merged 2 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page
- [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs.

### Changed

- [#2893](https://github.com/thanos-io/thanos/pull/2893) Store: Rename metric `thanos_bucket_store_cached_postings_compression_time_seconds` to `thanos_bucket_store_cached_postings_compression_time_seconds_total`.

## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10

### Fixed
Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ func runCompact(
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

for _, meta := range sy.Metas() {
groupKey := compact.DefaultGroupKey(meta.Thanos)
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func RunDownsample(
if err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

for _, meta := range metas {
groupKey := compact.DefaultGroupKey(meta.Thanos)
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
),
}

h.forwardRequests.WithLabelValues(labelSuccess)
h.forwardRequests.WithLabelValues(labelError)
h.replications.WithLabelValues(labelSuccess)
h.replications.WithLabelValues(labelError)

if o.ReplicationFactor > 1 {
h.replicationFactor.Set(float64(o.ReplicationFactor))
} else {
Expand Down
18 changes: 14 additions & 4 deletions pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ import (
"github.com/thanos-io/thanos/pkg/server/http"
)

const (
// Labels for metrics.
labelSuccess = "success"
labelError = "error"
)

// ParseFlagMatchers parse flag into matchers.
func ParseFlagMatchers(s []string) ([]*labels.Matcher, error) {
matchers := make([]*labels.Matcher, 0, len(s))
Expand Down Expand Up @@ -145,11 +151,15 @@ func RunReplicate(
Name: "thanos_replicate_replication_runs_total",
Help: "The number of replication runs split by success and error.",
}, []string{"result"})
replicationRunCounter.WithLabelValues(labelSuccess)
replicationRunCounter.WithLabelValues(labelError)

replicationRunDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_replicate_replication_run_duration_seconds",
Help: "The Duration of replication runs split by success and error.",
}, []string{"result"})
replicationRunDuration.WithLabelValues(labelSuccess)
replicationRunDuration.WithLabelValues(labelError)

fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil, nil)
if err != nil {
Expand Down Expand Up @@ -196,14 +206,14 @@ func RunReplicate(
start := time.Now()
if err := replicateFn(); err != nil {
level.Error(logger).Log("msg", "running replication failed", "err", err)
replicationRunCounter.WithLabelValues("error").Inc()
replicationRunDuration.WithLabelValues("error").Observe(time.Since(start).Seconds())
replicationRunCounter.WithLabelValues(labelError).Inc()
replicationRunDuration.WithLabelValues(labelError).Observe(time.Since(start).Seconds())

// No matter the error we want to repeat indefinitely.
return nil
}
replicationRunCounter.WithLabelValues("success").Inc()
replicationRunDuration.WithLabelValues("success").Observe(time.Since(start).Seconds())
replicationRunCounter.WithLabelValues(labelSuccess).Inc()
replicationRunDuration.WithLabelValues(labelSuccess).Observe(time.Since(start).Seconds())
level.Info(logger).Log("msg", "ran replication successfully")

return nil
Expand Down
27 changes: 20 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ const (
DefaultPostingOffsetInMemorySampling = 32

partitionerMaxGapSize = 512 * 1024

// Labels for metrics.
labelEncode = "encode"
labelDecode = "decode"
)

type bucketStoreMetrics struct {
Expand Down Expand Up @@ -191,14 +195,23 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Name: "thanos_bucket_store_cached_postings_compressions_total",
Help: "Number of postings compressions before storing to index cache.",
}, []string{"op"})
m.cachedPostingsCompressions.WithLabelValues(labelEncode)
m.cachedPostingsCompressions.WithLabelValues(labelDecode)

m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compression_errors_total",
Help: "Number of postings compression errors.",
}, []string{"op"})
m.cachedPostingsCompressionErrors.WithLabelValues(labelEncode)
m.cachedPostingsCompressionErrors.WithLabelValues(labelDecode)

m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compression_time_seconds",
Name: "thanos_bucket_store_cached_postings_compression_time_seconds_total",
Help: "Time spent compressing postings before storing them into postings cache.",
}, []string{"op"})
m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode)
m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode)

m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_original_size_bytes_total",
Help: "Original size of postings stored into cache.",
Expand Down Expand Up @@ -945,12 +958,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum))
s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount))
s.metrics.cachedPostingsCompressions.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressions))
s.metrics.cachedPostingsCompressions.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressions))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues("encode").Add(float64(stats.cachedPostingsCompressionErrors))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues("decode").Add(float64(stats.cachedPostingsDecompressionErrors))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("encode").Add(stats.cachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues("decode").Add(stats.cachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressions.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressions))
s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressions))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressionErrors))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressionErrors))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.cachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.cachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum))

Expand Down