From 617b50488328dd2f3ddde4212d5d25316afc2579 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Wed, 24 Apr 2019 21:52:24 +0300
Subject: [PATCH 01/22] Store: Add Time Or duration based partitioning
---
cmd/thanos/store.go | 12 +++++++
pkg/store/bucket.go | 15 ++++++++-
pkg/store/bucket_e2e_test.go | 9 ++++-
pkg/store/bucket_test.go | 3 +-
pkg/store/flag.go | 64 ++++++++++++++++++++++++++++++++++++
5 files changed, 100 insertions(+), 3 deletions(-)
create mode 100644 pkg/store/flag.go
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index d87ff2c89f..d6c8a216fa 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -49,6 +49,12 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()
+ minTime := store.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve").
+ Default("0s"))
+
+ maxTime := store.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve").
+ Default("9999-12-31T23:59:59Z"))
+
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
return runStore(g,
logger,
@@ -69,6 +75,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
debugLogging,
*syncInterval,
*blockSyncConcurrency,
+ minTime,
+ maxTime,
)
}
}
@@ -94,6 +102,8 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
+ minTime *store.TimeOrDurationValue,
+ maxTime *store.TimeOrDurationValue,
) error {
{
confContentYaml, err := objStoreConfig.Content()
@@ -135,6 +145,8 @@ func runStore(
maxConcurrent,
verbose,
blockSyncConcurrency,
+ minTime,
+ maxTime,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index 47965cd21a..0f52cfb189 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -208,6 +208,9 @@ type BucketStore struct {
// samplesLimiter limits the number of samples per each Series() call.
samplesLimiter *Limiter
partitioner partitioner
+
+ minTime *TimeOrDurationValue
+ maxTime *TimeOrDurationValue
}
// NewBucketStore creates a new bucket backed store that implements the store API against
@@ -223,6 +226,8 @@ func NewBucketStore(
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
+ minTime *TimeOrDurationValue,
+ maxTime *TimeOrDurationValue,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
@@ -242,6 +247,7 @@ func NewBucketStore(
metrics := newBucketStoreMetrics(reg)
s := &BucketStore{
logger: logger,
+ metrics: metrics,
bucket: bucket,
dir: dir,
indexCache: indexCache,
@@ -256,8 +262,9 @@ func NewBucketStore(
),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
+ minTime: minTime,
+ maxTime: maxTime,
}
- s.metrics = metrics
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
@@ -409,6 +416,12 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
if err != nil {
return errors.Wrap(err, "new bucket block")
}
+
+ // We check for blocks in configured minTime, maxTime range
+ if b.meta.MinTime < s.minTime.PrometheusTimestamp() || b.meta.MinTime > s.maxTime.PrometheusTimestamp() {
+ return nil
+ }
+
s.mtx.Lock()
defer s.mtx.Unlock()
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index 2185e0d161..ef5a751fbe 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -56,6 +56,13 @@ func (c *swappableCache) Series(b ulid.ULID, id uint64) ([]byte, bool) {
return c.ptr.Series(b, id)
}
+var (
+ zeroDur = time.Duration(0)
+ maxTime = time.Unix(1<<63-1, 0)
+ minTimeDuration = &TimeOrDurationValue{dur: &zeroDur}
+ maxTimeDuration = &TimeOrDurationValue{t: &maxTime}
+)
+
type storeSuite struct {
cancel context.CancelFunc
wg sync.WaitGroup
@@ -128,7 +135,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
testutil.Ok(t, os.RemoveAll(dir2))
}
- store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20)
+ store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, minTimeDuration, maxTimeDuration)
testutil.Ok(t, err)
s.store = store
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index 0a85f7cc49..ccf678ef58 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -416,7 +416,8 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "prometheus-test")
testutil.Ok(t, err)
- bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20)
+ bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, minTimeDuration, maxTimeDuration)
+
testutil.Ok(t, err)
resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
diff --git a/pkg/store/flag.go b/pkg/store/flag.go
new file mode 100644
index 0000000000..41f2f890a0
--- /dev/null
+++ b/pkg/store/flag.go
@@ -0,0 +1,64 @@
+package store
+
+import (
+ "time"
+
+ "github.com/prometheus/prometheus/pkg/timestamp"
+ "gopkg.in/alecthomas/kingpin.v2"
+)
+
+// TimeOrDurationValue is a custom kingping parser for time in RFC3339
+// or duration in Go's duration format, such as "300ms", "-1.5h" or "2h45m".
+// Only one will be set.
+type TimeOrDurationValue struct {
+ t *time.Time
+ dur *time.Duration
+}
+
+// Set converts string to TimeOrDurationValue
+func (tdv *TimeOrDurationValue) Set(s string) error {
+ t, err := time.Parse(time.RFC3339, s)
+ if err != nil {
+ dur, err := time.ParseDuration(s)
+ if err != nil {
+ return err
+ }
+ tdv.dur = &dur
+ return nil
+ }
+
+ tdv.t = &t
+ return nil
+}
+
+// String returns either tume or duration
+func (tdv *TimeOrDurationValue) String() string {
+ switch {
+ case tdv.t != nil:
+ return tdv.t.String()
+ case tdv.dur != nil:
+ return tdv.dur.String()
+ }
+
+ return "nil"
+}
+
+// PrometheusTimestamp returns TimeOrDurationValue converted to PrometheusTimestamp
+// if duration is set now+duration is converted to Timestamp.
+func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 {
+ switch {
+ case tdv.t != nil:
+ return timestamp.FromTime(*tdv.t)
+ case tdv.dur != nil:
+ return timestamp.FromTime(time.Now().Add(*tdv.dur))
+ }
+
+ return 0
+}
+
+// TimeOrDuration helper for parsing TimeOrDuration with kingpin
+func TimeOrDuration(flags *kingpin.FlagClause) *TimeOrDurationValue {
+ value := new(TimeOrDurationValue)
+ flags.SetValue(value)
+ return value
+}
From 7350c994fee03ab7c641c858b3562c49555782b6 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Wed, 24 Apr 2019 21:59:07 +0300
Subject: [PATCH 02/22] Add docs
---
docs/components/store.md | 3 +++
1 file changed, 3 insertions(+)
diff --git a/docs/components/store.md b/docs/components/store.md
index 3bc0c1e4ef..f117badc41 100644
--- a/docs/components/store.md
+++ b/docs/components/store.md
@@ -84,5 +84,8 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing blocks
from object storage.
+ --min-time=0s Start of time range limit to serve
+ --max-time=9999-12-31T23:59:59Z
+ End of time range limit to serve
```
From 564fafa939d37ab4686a4dda8de6976143ce3286 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Thu, 25 Apr 2019 07:26:42 +0300
Subject: [PATCH 03/22] Add flag tests
---
pkg/store/flag_test.go | 36 ++++++++++++++++++++++++++++++++++++
1 file changed, 36 insertions(+)
create mode 100644 pkg/store/flag_test.go
diff --git a/pkg/store/flag_test.go b/pkg/store/flag_test.go
new file mode 100644
index 0000000000..571651fc1c
--- /dev/null
+++ b/pkg/store/flag_test.go
@@ -0,0 +1,36 @@
+package store_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/improbable-eng/thanos/pkg/store"
+ "github.com/improbable-eng/thanos/pkg/testutil"
+ "github.com/prometheus/prometheus/pkg/timestamp"
+ "gopkg.in/alecthomas/kingpin.v2"
+)
+
+func TestTimeOrDurationValue(t *testing.T) {
+ cmd := kingpin.New("test", "test")
+
+ minTime := store.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve"))
+
+ maxTime := store.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve").
+ Default("9999-12-31T23:59:59Z"))
+
+ _, err := cmd.Parse([]string{"--min-time", "10s"})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ testutil.Equals(t, "10s", minTime.String())
+ testutil.Equals(t, "9999-12-31 23:59:59 +0000 UTC", maxTime.String())
+
+ prevTime := timestamp.FromTime(time.Now())
+ afterTime := timestamp.FromTime(time.Now().Add(15 * time.Second))
+
+ testutil.Assert(t, minTime.PrometheusTimestamp() > prevTime, "minTime prometheus timestamp is less than time now.")
+ testutil.Assert(t, minTime.PrometheusTimestamp() < afterTime, "minTime prometheus timestamp is more than time now + 15s")
+
+ testutil.Assert(t, 253402300799000 == maxTime.PrometheusTimestamp(), "maxTime is not equal to 253402300799000")
+}
From dc871496dd688f9bc6bd520a041646ded80eb4cc Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Thu, 25 Apr 2019 07:43:26 +0300
Subject: [PATCH 04/22] Add tests
---
cmd/thanos/store.go | 6 +++---
docs/components/store.md | 13 +++++++++++--
pkg/store/bucket_e2e_test.go | 4 ++--
pkg/store/flag.go | 8 +++++---
4 files changed, 21 insertions(+), 10 deletions(-)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index d6c8a216fa..cb1562e38a 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -49,10 +49,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()
- minTime := store.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve").
- Default("0s"))
+ minTime := store.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ Default("0000-01-01T00:00:00Z"))
- maxTime := store.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve").
+ maxTime := store.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
diff --git a/docs/components/store.md b/docs/components/store.md
index f117badc41..20472b995c 100644
--- a/docs/components/store.md
+++ b/docs/components/store.md
@@ -84,8 +84,17 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing blocks
from object storage.
- --min-time=0s Start of time range limit to serve
+ --min-time=0000-01-01T00:00:00Z
+ Start of time range limit to serve. Option can
+ be a constant time in RFC3339 format or time
+ duration relative to current time, such as
+ -1.5d or 2h45m. Valid duration units are ms, s,
+ m, h, d, w, y.
--max-time=9999-12-31T23:59:59Z
- End of time range limit to serve
+ End of time range limit to serve. Option can be
+ a constant time in RFC3339 format or time
+ duration relative to current time, such as
+ -1.5d or 2h45m. Valid duration units are ms, s,
+ m, h, d, w, y.
```
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index ef5a751fbe..c492ab3066 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -57,9 +57,9 @@ func (c *swappableCache) Series(b ulid.ULID, id uint64) ([]byte, bool) {
}
var (
- zeroDur = time.Duration(0)
+ minTime = time.Unix(0, 0)
maxTime = time.Unix(1<<63-1, 0)
- minTimeDuration = &TimeOrDurationValue{dur: &zeroDur}
+ minTimeDuration = &TimeOrDurationValue{t: &minTime}
maxTimeDuration = &TimeOrDurationValue{t: &maxTime}
)
diff --git a/pkg/store/flag.go b/pkg/store/flag.go
index 41f2f890a0..e0c355ab60 100644
--- a/pkg/store/flag.go
+++ b/pkg/store/flag.go
@@ -3,6 +3,7 @@ package store
import (
"time"
+ "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/timestamp"
"gopkg.in/alecthomas/kingpin.v2"
)
@@ -12,17 +13,18 @@ import (
// Only one will be set.
type TimeOrDurationValue struct {
t *time.Time
- dur *time.Duration
+ dur *model.Duration
}
// Set converts string to TimeOrDurationValue
func (tdv *TimeOrDurationValue) Set(s string) error {
t, err := time.Parse(time.RFC3339, s)
if err != nil {
- dur, err := time.ParseDuration(s)
+ dur, err := model.ParseDuration(s)
if err != nil {
return err
}
+
tdv.dur = &dur
return nil
}
@@ -50,7 +52,7 @@ func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 {
case tdv.t != nil:
return timestamp.FromTime(*tdv.t)
case tdv.dur != nil:
- return timestamp.FromTime(time.Now().Add(*tdv.dur))
+ return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.dur)))
}
return 0
From d158f7edc3042aca75beeaf709acd7ddf37bb0a9 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Thu, 25 Apr 2019 08:11:06 +0300
Subject: [PATCH 05/22] Skip loading cache for unneeded blocks
---
pkg/store/bucket.go | 35 ++++++++++++++++++++++++++++++-----
1 file changed, 30 insertions(+), 5 deletions(-)
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index 0f52cfb189..78d1f11dbf 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -316,6 +316,17 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
if err != nil {
return nil
}
+
+ inRange, err := s.isBlockInMinMaxRange(ctx, id)
+ if err != nil {
+ level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err)
+ return nil
+ }
+
+ if !inRange {
+ return nil
+ }
+
allIDs[id] = struct{}{}
if b := s.getBlock(id); b != nil {
@@ -384,6 +395,25 @@ func (s *BucketStore) numBlocks() int {
return len(s.blocks)
}
+func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) {
+ b := &bucketBlock{
+ logger: s.logger,
+ bucket: s.bucket,
+ id: id,
+ dir: s.dir,
+ }
+ if err := b.loadMeta(ctx, id); err != nil {
+ return false, err
+ }
+
+ // We check for blocks in configured minTime, maxTime range
+ if b.meta.MinTime < s.minTime.PrometheusTimestamp() || b.meta.MinTime > s.maxTime.PrometheusTimestamp() {
+ return false, nil
+ }
+
+ return true, nil
+}
+
func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
s.mtx.RLock()
defer s.mtx.RUnlock()
@@ -417,11 +447,6 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
return errors.Wrap(err, "new bucket block")
}
- // We check for blocks in configured minTime, maxTime range
- if b.meta.MinTime < s.minTime.PrometheusTimestamp() || b.meta.MinTime > s.maxTime.PrometheusTimestamp() {
- return nil
- }
-
s.mtx.Lock()
defer s.mtx.Unlock()
From 2823b76d4e53555fdc19197ca5d1d89c294cd017 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Thu, 25 Apr 2019 08:48:16 +0300
Subject: [PATCH 06/22] Add proper tests
---
pkg/store/bucket.go | 6 +++--
pkg/store/bucket_test.go | 58 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 62 insertions(+), 2 deletions(-)
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index 78d1f11dbf..d9ccc813e3 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -396,18 +396,20 @@ func (s *BucketStore) numBlocks() int {
}
func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) {
+ dir := filepath.Join(s.dir, id.String())
+
b := &bucketBlock{
logger: s.logger,
bucket: s.bucket,
id: id,
- dir: s.dir,
+ dir: dir,
}
if err := b.loadMeta(ctx, id); err != nil {
return false, err
}
// We check for blocks in configured minTime, maxTime range
- if b.meta.MinTime < s.minTime.PrometheusTimestamp() || b.meta.MinTime > s.maxTime.PrometheusTimestamp() {
+ if b.meta.MinTime < s.minTime.PrometheusTimestamp() || b.meta.MaxTime > s.maxTime.PrometheusTimestamp() {
return false, nil
}
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index ccf678ef58..19d2dadc2a 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -4,14 +4,23 @@ import (
"context"
"io/ioutil"
"math"
+ "path/filepath"
"testing"
"time"
"github.com/fortytw2/leaktest"
+ "github.com/go-kit/kit/log"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+ "github.com/improbable-eng/thanos/pkg/compact/downsample"
+ "github.com/improbable-eng/thanos/pkg/objstore/inmem"
+ "github.com/improbable-eng/thanos/pkg/store/storepb"
+ "github.com/improbable-eng/thanos/pkg/testutil"
"github.com/leanovate/gopter"
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
+ "github.com/prometheus/common/model"
+ "github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
@@ -427,3 +436,52 @@ func TestBucketStore_Info(t *testing.T) {
testutil.Equals(t, int64(math.MaxInt64), resp.MinTime)
testutil.Equals(t, int64(math.MinInt64), resp.MaxTime)
}
+
+func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
+ ctx := context.TODO()
+ dir, err := ioutil.TempDir("", "block-min-max-test")
+ testutil.Ok(t, err)
+
+ series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")}
+ extLset := labels.FromStrings("ext1", "value1")
+
+ // Create a block in range [-2w, -1w]
+ id1, err := testutil.CreateBlock(ctx, dir, series, 10,
+ timestamp.FromTime(time.Now().Add(-14*24*time.Hour)),
+ timestamp.FromTime(time.Now().Add(-7*24*time.Hour)),
+ extLset, 0)
+ testutil.Ok(t, err)
+
+ // Create a block in range [-1w, 0w]
+ id2, err := testutil.CreateBlock(ctx, dir, series, 10,
+ timestamp.FromTime(time.Now().Add(-7*24*time.Hour)),
+ timestamp.FromTime(time.Now().Add(-0*24*time.Hour)),
+ extLset, 0)
+ testutil.Ok(t, err)
+
+ dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String())
+ meta1, err := metadata.Read(dir1)
+ testutil.Ok(t, err)
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir1, meta1))
+
+ meta2, err := metadata.Read(dir2)
+ testutil.Ok(t, err)
+ testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta2))
+
+ // Run actual test
+
+ zeroTime := time.Unix(0, 0)
+ hourBefore := model.Duration(-1 * time.Hour)
+ minTime := &TimeOrDurationValue{t: &zeroTime}
+ maxTime := &TimeOrDurationValue{dur: &hourBefore}
+
+ bucketStore, err := NewBucketStore(nil, nil, inmem.NewBucket(), dir, 2e5, 2e5, 0, 0, false, 20,
+ minTime, maxTime)
+
+ inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1)
+ testutil.Ok(t, err)
+ testutil.Equals(t, true, inRange)
+
+ inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id2)
+ testutil.Equals(t, false, inRange)
+}
From 47cff11394725cb09b565c9256543a6af0ba15de Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Thu, 25 Apr 2019 10:05:02 +0300
Subject: [PATCH 07/22] Add sanity check
---
cmd/thanos/store.go | 5 +++++
pkg/store/bucket_e2e_test.go | 2 +-
pkg/store/bucket_test.go | 2 +-
3 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index cb1562e38a..4db28ba325 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -56,6 +56,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
Default("9999-12-31T23:59:59Z"))
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
+ // Sanity check minTime
+ if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
+ return errors.Errorf("error min-time '%s' can't be greater than max-time '%s'", minTime, maxTime)
+ }
+
return runStore(g,
logger,
reg,
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index c492ab3066..ed0022f1bd 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -58,7 +58,7 @@ func (c *swappableCache) Series(b ulid.ULID, id uint64) ([]byte, bool) {
var (
minTime = time.Unix(0, 0)
- maxTime = time.Unix(1<<63-1, 0)
+ maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = &TimeOrDurationValue{t: &minTime}
maxTimeDuration = &TimeOrDurationValue{t: &maxTime}
)
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index 19d2dadc2a..410d1f27fa 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -469,12 +469,12 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta2))
// Run actual test
-
zeroTime := time.Unix(0, 0)
hourBefore := model.Duration(-1 * time.Hour)
minTime := &TimeOrDurationValue{t: &zeroTime}
maxTime := &TimeOrDurationValue{dur: &hourBefore}
+ // bucketStore accepts blocks in range [0, now-1h]
bucketStore, err := NewBucketStore(nil, nil, inmem.NewBucket(), dir, 2e5, 2e5, 0, 0, false, 20,
minTime, maxTime)
From b2f122cfcdeac80cc506b4f19a227933b74c7a34 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Thu, 25 Apr 2019 10:32:30 +0300
Subject: [PATCH 08/22] Add support for negative duration flags
---
pkg/store/flag.go | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/pkg/store/flag.go b/pkg/store/flag.go
index e0c355ab60..3339567491 100644
--- a/pkg/store/flag.go
+++ b/pkg/store/flag.go
@@ -20,11 +20,19 @@ type TimeOrDurationValue struct {
func (tdv *TimeOrDurationValue) Set(s string) error {
t, err := time.Parse(time.RFC3339, s)
if err != nil {
+ var minus bool
+ if s[0] == '-' {
+ minus = true
+ s = s[1:]
+ }
dur, err := model.ParseDuration(s)
if err != nil {
return err
}
+ if minus {
+ dur = dur * -1
+ }
tdv.dur = &dur
return nil
}
From 69eff20ee96875bfa1df14fa9c1298ecb94f6b73 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bartek=20P=C5=82otka?=
Date: Fri, 3 May 2019 06:14:30 +0300
Subject: [PATCH 09/22] Apply suggestions from code review
Co-Authored-By: povilasv
---
cmd/thanos/store.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index 4db28ba325..c2a6937219 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -58,7 +58,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
// Sanity check minTime
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
- return errors.Errorf("error min-time '%s' can't be greater than max-time '%s'", minTime, maxTime)
+ return errors.Errorf("invalid argument: min-time '%s' can't be greater than max-time '%s'", minTime, maxTime)
}
return runStore(g,
From d0c697a12756f5353334eb256638a9dfaf9a973e Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Fri, 3 May 2019 06:23:02 +0300
Subject: [PATCH 10/22] Fix trailling period
---
cmd/thanos/store.go | 2 +-
pkg/compact/compact_test.go | 2 ++
pkg/store/bucket.go | 2 +-
3 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index c2a6937219..ba9bce0e1d 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -56,7 +56,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
Default("9999-12-31T23:59:59Z"))
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
- // Sanity check minTime
+ // Sanity check minTime.
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: min-time '%s' can't be greater than max-time '%s'", minTime, maxTime)
}
diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go
index 4148f5a8dd..20989fd253 100644
--- a/pkg/compact/compact_test.go
+++ b/pkg/compact/compact_test.go
@@ -7,6 +7,8 @@ import (
"testing"
"time"
+ "github.com/improbable-eng/thanos/pkg/objstore/inmem"
+ "github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
terrors "github.com/prometheus/tsdb/errors"
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index d9ccc813e3..1fafa6a809 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -408,7 +408,7 @@ func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (b
return false, err
}
- // We check for blocks in configured minTime, maxTime range
+ // We check for blocks in configured minTime, maxTime range.
if b.meta.MinTime < s.minTime.PrometheusTimestamp() || b.meta.MaxTime > s.maxTime.PrometheusTimestamp() {
return false, nil
}
From e16c7bfaecadf5896623991e71f83e50c7ad3886 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Fri, 3 May 2019 07:02:12 +0300
Subject: [PATCH 11/22] Fix tests
---
pkg/store/bucket_test.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index 410d1f27fa..d135de4c71 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -475,7 +475,7 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
maxTime := &TimeOrDurationValue{dur: &hourBefore}
// bucketStore accepts blocks in range [0, now-1h]
- bucketStore, err := NewBucketStore(nil, nil, inmem.NewBucket(), dir, 2e5, 2e5, 0, 0, false, 20,
+ bucketStore, err := NewBucketStore(nil, nil, inmem.NewBucket(), dir, noopCache{}, 0, 0, 20, false, 20,
minTime, maxTime)
inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1)
From 2a9a7edc6cb1659a384e765505d32771b26aed98 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Fri, 3 May 2019 07:12:43 +0300
Subject: [PATCH 12/22] Refactor flag into separate package
---
cmd/thanos/store.go | 14 +++--
pkg/{store/flag.go => model/timeduration.go} | 55 ++++++++++---------
.../timeduration_test.go} | 8 +--
pkg/store/bucket.go | 21 +++++--
pkg/store/bucket_e2e_test.go | 8 +++
pkg/store/bucket_test.go | 9 +--
6 files changed, 72 insertions(+), 43 deletions(-)
rename pkg/{store/flag.go => model/timeduration.go} (67%)
rename pkg/{store/flag_test.go => model/timeduration_test.go} (84%)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index ba9bce0e1d..76eedebf4f 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -7,6 +7,12 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
+ "github.com/improbable-eng/thanos/pkg/model"
+ "github.com/improbable-eng/thanos/pkg/objstore/client"
+ "github.com/improbable-eng/thanos/pkg/runutil"
+ "github.com/improbable-eng/thanos/pkg/store"
+ storecache "github.com/improbable-eng/thanos/pkg/store/cache"
+ "github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
@@ -49,10 +55,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()
- minTime := store.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
- maxTime := store.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
@@ -107,8 +113,8 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
- minTime *store.TimeOrDurationValue,
- maxTime *store.TimeOrDurationValue,
+ minTime *model.TimeOrDurationValue,
+ maxTime *model.TimeOrDurationValue,
) error {
{
confContentYaml, err := objStoreConfig.Content()
diff --git a/pkg/store/flag.go b/pkg/model/timeduration.go
similarity index 67%
rename from pkg/store/flag.go
rename to pkg/model/timeduration.go
index 3339567491..2721ba2bf5 100644
--- a/pkg/store/flag.go
+++ b/pkg/model/timeduration.go
@@ -1,4 +1,4 @@
-package store
+package model
import (
"time"
@@ -12,42 +12,43 @@ import (
// or duration in Go's duration format, such as "300ms", "-1.5h" or "2h45m".
// Only one will be set.
type TimeOrDurationValue struct {
- t *time.Time
- dur *model.Duration
+ Time *time.Time
+ Dur *model.Duration
}
// Set converts string to TimeOrDurationValue
func (tdv *TimeOrDurationValue) Set(s string) error {
t, err := time.Parse(time.RFC3339, s)
- if err != nil {
- var minus bool
- if s[0] == '-' {
- minus = true
- s = s[1:]
- }
- dur, err := model.ParseDuration(s)
- if err != nil {
- return err
- }
-
- if minus {
- dur = dur * -1
- }
- tdv.dur = &dur
+ if err == nil {
+ tdv.Time = &t
return nil
}
- tdv.t = &t
+ // error parsing time, let's try duration.
+ var minus bool
+ if s[0] == '-' {
+ minus = true
+ s = s[1:]
+ }
+ dur, err := model.ParseDuration(s)
+ if err != nil {
+ return err
+ }
+
+ if minus {
+ dur = dur * -1
+ }
+ tdv.Dur = &dur
return nil
}
// String returns either tume or duration
func (tdv *TimeOrDurationValue) String() string {
switch {
- case tdv.t != nil:
- return tdv.t.String()
- case tdv.dur != nil:
- return tdv.dur.String()
+ case tdv.Time != nil:
+ return tdv.Time.String()
+ case tdv.Dur != nil:
+ return tdv.Dur.String()
}
return "nil"
@@ -57,10 +58,10 @@ func (tdv *TimeOrDurationValue) String() string {
// if duration is set now+duration is converted to Timestamp.
func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 {
switch {
- case tdv.t != nil:
- return timestamp.FromTime(*tdv.t)
- case tdv.dur != nil:
- return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.dur)))
+ case tdv.Time != nil:
+ return timestamp.FromTime(*tdv.Time)
+ case tdv.Dur != nil:
+ return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.Dur)))
}
return 0
diff --git a/pkg/store/flag_test.go b/pkg/model/timeduration_test.go
similarity index 84%
rename from pkg/store/flag_test.go
rename to pkg/model/timeduration_test.go
index 571651fc1c..b00a491b5f 100644
--- a/pkg/store/flag_test.go
+++ b/pkg/model/timeduration_test.go
@@ -1,10 +1,10 @@
-package store_test
+package model_test
import (
"testing"
"time"
- "github.com/improbable-eng/thanos/pkg/store"
+ "github.com/improbable-eng/thanos/pkg/model"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/prometheus/prometheus/pkg/timestamp"
"gopkg.in/alecthomas/kingpin.v2"
@@ -13,9 +13,9 @@ import (
func TestTimeOrDurationValue(t *testing.T) {
cmd := kingpin.New("test", "test")
- minTime := store.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve"))
+ minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve"))
- maxTime := store.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve").
+ maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve").
Default("9999-12-31T23:59:59Z"))
_, err := cmd.Parse([]string{"--min-time", "10s"})
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index 1fafa6a809..83e0ceb489 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -18,6 +18,19 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
+ "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+ "github.com/improbable-eng/thanos/pkg/compact/downsample"
+ "github.com/improbable-eng/thanos/pkg/component"
+ "github.com/improbable-eng/thanos/pkg/extprom"
+ "github.com/improbable-eng/thanos/pkg/model"
+ "github.com/improbable-eng/thanos/pkg/objstore"
+ "github.com/improbable-eng/thanos/pkg/pool"
+ "github.com/improbable-eng/thanos/pkg/runutil"
+ "github.com/improbable-eng/thanos/pkg/store/storepb"
+ "github.com/improbable-eng/thanos/pkg/strutil"
+ "github.com/improbable-eng/thanos/pkg/tracing"
+>>>>>>> Refactor flag into separate package
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/pkg/errors"
@@ -209,8 +222,8 @@ type BucketStore struct {
samplesLimiter *Limiter
partitioner partitioner
- minTime *TimeOrDurationValue
- maxTime *TimeOrDurationValue
+ minTime *model.TimeOrDurationValue
+ maxTime *model.TimeOrDurationValue
}
// NewBucketStore creates a new bucket backed store that implements the store API against
@@ -226,8 +239,8 @@ func NewBucketStore(
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
- minTime *TimeOrDurationValue,
- maxTime *TimeOrDurationValue,
+ minTime *model.TimeOrDurationValue,
+ maxTime *model.TimeOrDurationValue,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index ed0022f1bd..95e08252bf 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -12,6 +12,14 @@ import (
"github.com/oklog/ulid"
"github.com/go-kit/kit/log"
+ "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/metadata"
+ "github.com/improbable-eng/thanos/pkg/objstore"
+ "github.com/improbable-eng/thanos/pkg/objstore/objtesting"
+ "github.com/improbable-eng/thanos/pkg/runutil"
+ storecache "github.com/improbable-eng/thanos/pkg/store/cache"
+ "github.com/improbable-eng/thanos/pkg/store/storepb"
+ "github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb/labels"
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index d135de4c71..64daca7ff5 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -12,6 +12,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
+ "github.com/improbable-eng/thanos/pkg/model"
"github.com/improbable-eng/thanos/pkg/objstore/inmem"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/testutil"
@@ -19,7 +20,7 @@ import (
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
- "github.com/prometheus/common/model"
+ prommodel "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
@@ -470,9 +471,9 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
// Run actual test
zeroTime := time.Unix(0, 0)
- hourBefore := model.Duration(-1 * time.Hour)
- minTime := &TimeOrDurationValue{t: &zeroTime}
- maxTime := &TimeOrDurationValue{dur: &hourBefore}
+ hourBefore := prommodel.Duration(-1 * time.Hour)
+ minTime := &model.TimeOrDurationValue{Time: &zeroTime}
+ maxTime := &model.TimeOrDurationValue{Dur: &hourBefore}
// bucketStore accepts blocks in range [0, now-1h]
bucketStore, err := NewBucketStore(nil, nil, inmem.NewBucket(), dir, noopCache{}, 0, 0, 20, false, 20,
From b2fea49f325c95fa0672069310d6c8c339a91a72 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Fri, 3 May 2019 08:08:43 +0300
Subject: [PATCH 13/22] Add docs & changelog
---
docs/components/store.md | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/docs/components/store.md b/docs/components/store.md
index 20472b995c..e6358d26ff 100644
--- a/docs/components/store.md
+++ b/docs/components/store.md
@@ -98,3 +98,15 @@ Flags:
m, h, d, w, y.
```
+
+## Time & Duration based partioning
+
+By default Thanos Store Gateway looks at all the data in Object Store and returns it based on query's time range.
+You can shard Thanos Store gateway based on time or duration relative to current time.
+
+For example, setting: `--min-time=-6w` & `--max-time=-2w` will make Thanos Store Gateway look at blocks that fall within `now - 6 weeks` up to `now - 2 weeks`.
+
+You can also set constant time in RFC3339 format. For example, `--min-time=2018-01-01T00:00:00Z`, `--max-time=2019-01-01T23:59:59Z`
+
+Note that Thanos Store Gateway looks at both block's minTime and maxTime. This means that both block start time and end time needs to fall within the provided range. Therefore if block's start time is less than `--min-time`, the block won't be included as well as if block's end time is more than `--max-time`, the block won't be included.
+
From f69d53609fa03496ca646cc329bfd4f4f0388cac Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Fri, 3 May 2019 08:19:59 +0300
Subject: [PATCH 14/22] Improve docs
---
docs/components/store.md | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/docs/components/store.md b/docs/components/store.md
index e6358d26ff..fe43a328a8 100644
--- a/docs/components/store.md
+++ b/docs/components/store.md
@@ -104,9 +104,8 @@ Flags:
By default Thanos Store Gateway looks at all the data in Object Store and returns it based on query's time range.
You can shard Thanos Store gateway based on time or duration relative to current time.
-For example, setting: `--min-time=-6w` & `--max-time=-2w` will make Thanos Store Gateway look at blocks that fall within `now - 6 weeks` up to `now - 2 weeks`.
+For example setting: `--min-time=-6w` & `--max-time=-2w` will make Thanos Store Gateway look at blocks that fall within `now - 6 weeks` up to `now - 2 weeks`.
-You can also set constant time in RFC3339 format. For example, `--min-time=2018-01-01T00:00:00Z`, `--max-time=2019-01-01T23:59:59Z`
+You can also set constant time in RFC3339 format. For example `--min-time=2018-01-01T00:00:00Z`, `--max-time=2019-01-01T23:59:59Z`.
Note that Thanos Store Gateway looks at both block's minTime and maxTime. This means that both block start time and end time needs to fall within the provided range. Therefore if block's start time is less than `--min-time`, the block won't be included as well as if block's end time is more than `--max-time`, the block won't be included.
-
From 6fb27079bb9492bc8a96142eaaa5103a7d507cd8 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Tue, 7 May 2019 15:14:12 +0300
Subject: [PATCH 15/22] Fix thanos store to check only mintime
---
cmd/thanos/store.go | 4 ++--
docs/components/store.md | 23 +++++++++++++++--------
go.mod | 2 ++
pkg/store/bucket.go | 2 +-
4 files changed, 20 insertions(+), 11 deletions(-)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index 76eedebf4f..cd79620194 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -55,10 +55,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()
- minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have start time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
- maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which have start time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
diff --git a/docs/components/store.md b/docs/components/store.md
index fe43a328a8..c8f3bb0ca6 100644
--- a/docs/components/store.md
+++ b/docs/components/store.md
@@ -85,17 +85,21 @@ Flags:
Number of goroutines to use when syncing blocks
from object storage.
--min-time=0000-01-01T00:00:00Z
- Start of time range limit to serve. Option can
- be a constant time in RFC3339 format or time
+ Start of time range limit to serve. Thanos
+ Store serves only blocks, which have start time
+ greater than this value. Option can be a
+ constant time in RFC3339 format or time
duration relative to current time, such as
-1.5d or 2h45m. Valid duration units are ms, s,
m, h, d, w, y.
--max-time=9999-12-31T23:59:59Z
- End of time range limit to serve. Option can be
- a constant time in RFC3339 format or time
- duration relative to current time, such as
- -1.5d or 2h45m. Valid duration units are ms, s,
- m, h, d, w, y.
+ End of time range limit to serve. Thanos Store
+ serves only blocks, which have start time is
+ less than this value. Option can be a constant
+ time in RFC3339 format or time duration
+ relative to current time, such as -1.5d or
+ 2h45m. Valid duration units are ms, s, m, h, d,
+ w, y.
```
@@ -108,4 +112,7 @@ For example setting: `--min-time=-6w` & `--max-time=-2w` will make Thanos Store
You can also set constant time in RFC3339 format. For example `--min-time=2018-01-01T00:00:00Z`, `--max-time=2019-01-01T23:59:59Z`.
-Note that Thanos Store Gateway looks at both block's minTime and maxTime. This means that both block start time and end time needs to fall within the provided range. Therefore if block's start time is less than `--min-time`, the block won't be included as well as if block's end time is more than `--max-time`, the block won't be included.
+There is a sync-block job, which syncs up with remote storage and filters out blocks. You can configure how often it runs via `--sync-block-duration=3m`. In most cases default should work well.
+
+Note that Thanos Store Gateway only looks at block's start time. Therefore if block's start time is less than `--min-time`, the block won't be included as well as if it is more than `--max-time`.
+
diff --git a/go.mod b/go.mod
index e75e8c131d..08e2915c79 100644
--- a/go.mod
+++ b/go.mod
@@ -49,3 +49,5 @@ require (
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.2.2
)
+
+go 1.13
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index 83e0ceb489..7e27adc56b 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -422,7 +422,7 @@ func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (b
}
// We check for blocks in configured minTime, maxTime range.
- if b.meta.MinTime < s.minTime.PrometheusTimestamp() || b.meta.MaxTime > s.maxTime.PrometheusTimestamp() {
+ if b.meta.MinTime < s.minTime.PrometheusTimestamp() || b.meta.MinTime > s.maxTime.PrometheusTimestamp() {
return false, nil
}
From 4091901f3e2060599b3bff81abe7bda43f9ccb50 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Tue, 7 May 2019 15:57:39 +0300
Subject: [PATCH 16/22] Fix tests
---
pkg/store/bucket_test.go | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index 64daca7ff5..2141e6238d 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -460,6 +460,13 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
extLset, 0)
testutil.Ok(t, err)
+ // Create a block in range [+1w, +2w]
+ id3, err := testutil.CreateBlock(ctx, dir, series, 10,
+ timestamp.FromTime(time.Now().Add(7*24*time.Hour)),
+ timestamp.FromTime(time.Now().Add(14*24*time.Hour)),
+ extLset, 0)
+ testutil.Ok(t, err)
+
dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String())
meta1, err := metadata.Read(dir1)
testutil.Ok(t, err)
@@ -484,5 +491,9 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
testutil.Equals(t, true, inRange)
inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id2)
+ testutil.Equals(t, true, inRange)
+
+ inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id3)
testutil.Equals(t, false, inRange)
+
}
From 82798b4b3a5519de563c4e998a89f4f31946a605 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Wed, 8 May 2019 07:40:04 +0300
Subject: [PATCH 17/22] Fix go.mod
---
go.mod | 2 --
1 file changed, 2 deletions(-)
diff --git a/go.mod b/go.mod
index 08e2915c79..e75e8c131d 100644
--- a/go.mod
+++ b/go.mod
@@ -49,5 +49,3 @@ require (
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.2.2
)
-
-go 1.13
From be225ae51ba109168e95b4e97a039f6f53d218de Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Wed, 8 May 2019 07:41:00 +0300
Subject: [PATCH 18/22] Fixes after review
---
pkg/model/timeduration.go | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/pkg/model/timeduration.go b/pkg/model/timeduration.go
index 2721ba2bf5..bbe766043f 100644
--- a/pkg/model/timeduration.go
+++ b/pkg/model/timeduration.go
@@ -16,7 +16,7 @@ type TimeOrDurationValue struct {
Dur *model.Duration
}
-// Set converts string to TimeOrDurationValue
+// Set converts string to TimeOrDurationValue.
func (tdv *TimeOrDurationValue) Set(s string) error {
t, err := time.Parse(time.RFC3339, s)
if err == nil {
@@ -42,7 +42,7 @@ func (tdv *TimeOrDurationValue) Set(s string) error {
return nil
}
-// String returns either tume or duration
+// String returns either tume or duration.
func (tdv *TimeOrDurationValue) String() string {
switch {
case tdv.Time != nil:
@@ -67,7 +67,7 @@ func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 {
return 0
}
-// TimeOrDuration helper for parsing TimeOrDuration with kingpin
+// TimeOrDuration helper for parsing TimeOrDuration with kingpin.
func TimeOrDuration(flags *kingpin.FlagClause) *TimeOrDurationValue {
value := new(TimeOrDurationValue)
flags.SetValue(value)
From b211b4588ecfb57c40adc7c03d2c19b05e7ca073 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Mon, 20 May 2019 08:58:03 +0300
Subject: [PATCH 19/22] Add block end time filters
---
cmd/thanos/store.go | 38 +++++++++++++++++++++++++-----------
docs/components/store.md | 19 ++++++++++++++++--
pkg/store/bucket.go | 33 +++++++++++++++++++++----------
pkg/store/bucket_e2e_test.go | 19 +++++++++++++++---
pkg/store/bucket_test.go | 17 ++++++++--------
5 files changed, 92 insertions(+), 34 deletions(-)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index cd79620194..271e78ffe4 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -55,16 +55,30 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()
- minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have start time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ minBlockStartTime := model.TimeOrDuration(cmd.Flag("--min-block-start-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have start time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
- maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which have start time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ maxBlockStartTime := model.TimeOrDuration(cmd.Flag("--max-block-start-time", "End of time range limit to serve. Thanos Store serves only blocks, which have start time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ Default("9999-12-31T23:59:59Z"))
+
+ minBlockEndTime := model.TimeOrDuration(cmd.Flag("--min-block-end-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have end time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ Default("0000-01-01T00:00:00Z"))
+
+ maxBlockEndTime := model.TimeOrDuration(cmd.Flag("--max-block-end-time", "End of time range limit to serve. Thanos Store serves only blocks, which have end time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
- // Sanity check minTime.
- if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
- return errors.Errorf("invalid argument: min-time '%s' can't be greater than max-time '%s'", minTime, maxTime)
+ // Sanity check Time limits
+ switch {
+ case minBlockStartTime.PrometheusTimestamp() > maxBlockStartTime.PrometheusTimestamp():
+ return errors.Errorf("invalid argument: --min-block-start-time '%s' can't be greater than --max-block-start-time '%s'", minBlockStartTime, maxBlockStartTime)
+ case minBlockEndTime.PrometheusTimestamp() > maxBlockEndTime.PrometheusTimestamp():
+ return errors.Errorf("invalid argument: --min-block-end-time '%s' can't be greater than --max-block-end-time '%s'", minBlockStartTime, maxBlockStartTime)
+
+ case minBlockStartTime.PrometheusTimestamp() > maxBlockEndTime.PrometheusTimestamp():
+ return errors.Errorf("invalid argument: --min-block-start-time '%s' can't be greater than --max-block-end-time '%s'", minBlockStartTime, maxBlockStartTime)
+ case minBlockEndTime.PrometheusTimestamp() > maxBlockStartTime.PrometheusTimestamp():
+ return errors.Errorf("invalid argument: --min-block-end-time '%s' can't be greater than --max-block-start-time '%s'", minBlockStartTime, maxBlockStartTime)
}
return runStore(g,
@@ -86,8 +100,12 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
debugLogging,
*syncInterval,
*blockSyncConcurrency,
- minTime,
- maxTime,
+ &store.BlockFilterConfig{
+ MinBlockStartTime: *minBlockStartTime,
+ MaxBlockStartTime: *maxBlockStartTime,
+ MinBlockEndTime: *minBlockEndTime,
+ MaxBlockEndTime: *maxBlockEndTime,
+ },
)
}
}
@@ -113,8 +131,7 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
- minTime *model.TimeOrDurationValue,
- maxTime *model.TimeOrDurationValue,
+ blockFilterConf *store.BlockFilterConfig,
) error {
{
confContentYaml, err := objStoreConfig.Content()
@@ -156,8 +173,7 @@ func runStore(
maxConcurrent,
verbose,
blockSyncConcurrency,
- minTime,
- maxTime,
+ blockFilterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
diff --git a/docs/components/store.md b/docs/components/store.md
index c8f3bb0ca6..d57d33d6bb 100644
--- a/docs/components/store.md
+++ b/docs/components/store.md
@@ -84,7 +84,7 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing blocks
from object storage.
- --min-time=0000-01-01T00:00:00Z
+ ----min-block-start-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store serves only blocks, which have start time
greater than this value. Option can be a
@@ -92,7 +92,7 @@ Flags:
duration relative to current time, such as
-1.5d or 2h45m. Valid duration units are ms, s,
m, h, d, w, y.
- --max-time=9999-12-31T23:59:59Z
+ ----max-block-start-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
serves only blocks, which have start time is
less than this value. Option can be a constant
@@ -100,6 +100,21 @@ Flags:
relative to current time, such as -1.5d or
2h45m. Valid duration units are ms, s, m, h, d,
w, y.
+ ----min-block-end-time=0000-01-01T00:00:00Z
+ Start of time range limit to serve. Thanos
+ Store serves only blocks, which have end time
+ greater than this value. Option can be a
+ constant time in RFC3339 format or time
+ duration relative to current time, such as
+ -1.5d or 2h45m. Valid duration units are ms, s,
+ m, h, d, w, y.
+ ----max-block-end-time=9999-12-31T23:59:59Z
+ End of time range limit to serve. Thanos Store
+ serves only blocks, which have end time is less
+ than this value. Option can be a constant time
+ in RFC3339 format or time duration relative to
+ current time, such as -1.5d or 2h45m. Valid
+ duration units are ms, s, m, h, d, w, y.
```
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index 7e27adc56b..b1929d13b2 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -195,6 +195,12 @@ type indexCache interface {
Series(b ulid.ULID, id uint64) ([]byte, bool)
}
+// BlockFilterConfig is a configiration, which Store uses for filtering blocks.
+type BlockFilterConfig struct {
+ MinBlockStartTime, MaxBlockStartTime model.TimeOrDurationValue
+ MinBlockEndTime, MaxBlockEndTime model.TimeOrDurationValue
+}
+
// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
type BucketStore struct {
@@ -222,8 +228,7 @@ type BucketStore struct {
samplesLimiter *Limiter
partitioner partitioner
- minTime *model.TimeOrDurationValue
- maxTime *model.TimeOrDurationValue
+ blockFilterConf *BlockFilterConfig
}
// NewBucketStore creates a new bucket backed store that implements the store API against
@@ -239,8 +244,7 @@ func NewBucketStore(
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
- minTime *model.TimeOrDurationValue,
- maxTime *model.TimeOrDurationValue,
+ blockFilterConf *BlockFilterConfig,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
@@ -273,10 +277,9 @@ func NewBucketStore(
maxConcurrent,
extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg),
),
- samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
- partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
- minTime: minTime,
- maxTime: maxTime,
+ samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
+ partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
+ blockFilterConf: blockFilterConf,
}
if err := os.MkdirAll(dir, 0777); err != nil {
@@ -421,8 +424,18 @@ func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (b
return false, err
}
- // We check for blocks in configured minTime, maxTime range.
- if b.meta.MinTime < s.minTime.PrometheusTimestamp() || b.meta.MinTime > s.maxTime.PrometheusTimestamp() {
+ // We check for blocks in configured minTime, maxTime range
+ switch {
+ case b.meta.MinTime < s.blockFilterConf.MinBlockStartTime.PrometheusTimestamp():
+ return false, nil
+
+ case b.meta.MinTime > s.blockFilterConf.MaxBlockStartTime.PrometheusTimestamp():
+ return false, nil
+
+ case b.meta.MaxTime < s.blockFilterConf.MinBlockEndTime.PrometheusTimestamp():
+ return false, nil
+
+ case b.meta.MaxTime > s.blockFilterConf.MaxBlockEndTime.PrometheusTimestamp():
return false, nil
}
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index 95e08252bf..3e0e893f3e 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -9,8 +9,6 @@ import (
"testing"
"time"
- "github.com/oklog/ulid"
-
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
@@ -20,6 +18,7 @@ import (
storecache "github.com/improbable-eng/thanos/pkg/store/cache"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/testutil"
+ "github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb/labels"
@@ -33,6 +32,20 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)
+var (
+ minTime = time.Unix(0, 0)
+ maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
+ minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
+ maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
+ blockFilterConf = &BlockFilterConfig{
+ MinBlockStartTime: minTimeDuration,
+ MaxBlockStartTime: maxTimeDuration,
+ MinBlockEndTime: minTimeDuration,
+ MaxBlockEndTime: maxTimeDuration,
+ }
+>>>>>>> Add block end time filters
+)
+
type noopCache struct{}
func (noopCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {}
@@ -143,7 +156,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
testutil.Ok(t, os.RemoveAll(dir2))
}
- store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, minTimeDuration, maxTimeDuration)
+ store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, blockFilterConf)
testutil.Ok(t, err)
s.store = store
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index 2141e6238d..95ff98b8b0 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -426,8 +426,7 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "prometheus-test")
testutil.Ok(t, err)
- bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, minTimeDuration, maxTimeDuration)
-
+ bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, blockFilterConf)
testutil.Ok(t, err)
resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
@@ -477,14 +476,17 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta2))
// Run actual test
- zeroTime := time.Unix(0, 0)
- hourBefore := prommodel.Duration(-1 * time.Hour)
- minTime := &model.TimeOrDurationValue{Time: &zeroTime}
- maxTime := &model.TimeOrDurationValue{Dur: &hourBefore}
+ hourBeforeDur := prommodel.Duration(-1 * time.Hour)
+ hourBefore := model.TimeOrDurationValue{Dur: &hourBeforeDur}
// bucketStore accepts blocks in range [0, now-1h]
bucketStore, err := NewBucketStore(nil, nil, inmem.NewBucket(), dir, noopCache{}, 0, 0, 20, false, 20,
- minTime, maxTime)
+ &BlockFilterConfig{
+ MinBlockStartTime: minTimeDuration,
+ MaxBlockStartTime: hourBefore,
+ MinBlockEndTime: minTimeDuration,
+ MaxBlockEndTime: maxTimeDuration,
+ })
inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1)
testutil.Ok(t, err)
@@ -495,5 +497,4 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) {
inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id3)
testutil.Equals(t, false, inRange)
-
}
From 96d81c6acc8a5d894e84402fb296deccbc852804 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Mon, 20 May 2019 10:03:10 +0300
Subject: [PATCH 20/22] Fix flag names
---
cmd/thanos/store.go | 8 ++++----
docs/components/store.md | 6 +++---
2 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index 271e78ffe4..aa75dc1dff 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -55,20 +55,20 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()
- minBlockStartTime := model.TimeOrDuration(cmd.Flag("--min-block-start-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have start time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ minBlockStartTime := model.TimeOrDuration(cmd.Flag("min-block-start-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have start time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
maxBlockStartTime := model.TimeOrDuration(cmd.Flag("--max-block-start-time", "End of time range limit to serve. Thanos Store serves only blocks, which have start time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
- minBlockEndTime := model.TimeOrDuration(cmd.Flag("--min-block-end-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have end time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ minBlockEndTime := model.TimeOrDuration(cmd.Flag("min-block-end-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have end time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
- maxBlockEndTime := model.TimeOrDuration(cmd.Flag("--max-block-end-time", "End of time range limit to serve. Thanos Store serves only blocks, which have end time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ maxBlockEndTime := model.TimeOrDuration(cmd.Flag("max-block-end-time", "End of time range limit to serve. Thanos Store serves only blocks, which have end time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
- // Sanity check Time limits
+ // Sanity check Time filters
switch {
case minBlockStartTime.PrometheusTimestamp() > maxBlockStartTime.PrometheusTimestamp():
return errors.Errorf("invalid argument: --min-block-start-time '%s' can't be greater than --max-block-start-time '%s'", minBlockStartTime, maxBlockStartTime)
diff --git a/docs/components/store.md b/docs/components/store.md
index d57d33d6bb..175379e529 100644
--- a/docs/components/store.md
+++ b/docs/components/store.md
@@ -84,7 +84,7 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing blocks
from object storage.
- ----min-block-start-time=0000-01-01T00:00:00Z
+ --min-block-start-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store serves only blocks, which have start time
greater than this value. Option can be a
@@ -100,7 +100,7 @@ Flags:
relative to current time, such as -1.5d or
2h45m. Valid duration units are ms, s, m, h, d,
w, y.
- ----min-block-end-time=0000-01-01T00:00:00Z
+ --min-block-end-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store serves only blocks, which have end time
greater than this value. Option can be a
@@ -108,7 +108,7 @@ Flags:
duration relative to current time, such as
-1.5d or 2h45m. Valid duration units are ms, s,
m, h, d, w, y.
- ----max-block-end-time=9999-12-31T23:59:59Z
+ --max-block-end-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
serves only blocks, which have end time is less
than this value. Option can be a constant time
From ffa5baa92b276691b4ff862e333dd851b9da1e83 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Tue, 21 May 2019 06:46:15 +0300
Subject: [PATCH 21/22] Improve docs
---
cmd/thanos/store.go | 9 ++-------
docs/components/store.md | 22 ++++++++++++++++------
pkg/compact/compact_test.go | 2 --
pkg/model/timeduration_test.go | 4 ++--
pkg/store/bucket.go | 14 +-------------
pkg/store/bucket_e2e_test.go | 10 +---------
pkg/store/bucket_test.go | 8 ++------
7 files changed, 24 insertions(+), 45 deletions(-)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index aa75dc1dff..60971f675a 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -7,16 +7,11 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
- "github.com/improbable-eng/thanos/pkg/model"
- "github.com/improbable-eng/thanos/pkg/objstore/client"
- "github.com/improbable-eng/thanos/pkg/runutil"
- "github.com/improbable-eng/thanos/pkg/store"
- storecache "github.com/improbable-eng/thanos/pkg/store/cache"
- "github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
@@ -58,7 +53,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
minBlockStartTime := model.TimeOrDuration(cmd.Flag("min-block-start-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have start time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
- maxBlockStartTime := model.TimeOrDuration(cmd.Flag("--max-block-start-time", "End of time range limit to serve. Thanos Store serves only blocks, which have start time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ maxBlockStartTime := model.TimeOrDuration(cmd.Flag("max-block-start-time", "End of time range limit to serve. Thanos Store serves only blocks, which have start time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
minBlockEndTime := model.TimeOrDuration(cmd.Flag("min-block-end-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have end time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
diff --git a/docs/components/store.md b/docs/components/store.md
index 175379e529..3b45755b36 100644
--- a/docs/components/store.md
+++ b/docs/components/store.md
@@ -92,7 +92,7 @@ Flags:
duration relative to current time, such as
-1.5d or 2h45m. Valid duration units are ms, s,
m, h, d, w, y.
- ----max-block-start-time=9999-12-31T23:59:59Z
+ --max-block-start-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
serves only blocks, which have start time is
less than this value. Option can be a constant
@@ -121,13 +121,23 @@ Flags:
## Time & Duration based partioning
By default Thanos Store Gateway looks at all the data in Object Store and returns it based on query's time range.
-You can shard Thanos Store gateway based on time or duration relative to current time.
-For example setting: `--min-time=-6w` & `--max-time=-2w` will make Thanos Store Gateway look at blocks that fall within `now - 6 weeks` up to `now - 2 weeks`.
+There is a block syncing job, which synchronizes local state with remote storage. You can configure how often it runs via `--sync-block-duration=3m`. In most cases default should work well.
-You can also set constant time in RFC3339 format. For example `--min-time=2018-01-01T00:00:00Z`, `--max-time=2019-01-01T23:59:59Z`.
-There is a sync-block job, which syncs up with remote storage and filters out blocks. You can configure how often it runs via `--sync-block-duration=3m`. In most cases default should work well.
+Recently Thanos Store introduced `--min-block-start-time`, `--max-block-start-time`, `--min-block-end-time`,`--max-block-end-time` flags, that allows you to shard Thanos Store based on constant time or duration relative to current time.
+The `{min,max}-block-start-time` options only look at block's start time and `{min,max}-block-end-time` only at the end time.
-Note that Thanos Store Gateway only looks at block's start time. Therefore if block's start time is less than `--min-time`, the block won't be included as well as if it is more than `--max-time`.
+For example setting: `--min-block-start-time=-6w` & `--max-block-start-time==-2w` will make Thanos Store Gateway look at blocks that fall within `now - 6 weeks` up to `now - 2 weeks` time range.
+You can also set constant time in RFC3339 format. For example `--min-block-start-time=2018-01-01T00:00:00Z`, `--max-block-start-time=2019-01-01T23:59:59Z`.
+
+The block filtering is done in Thanos Store's syncing job, which adds some delay, so Thanos Store might not see new blocks or filter out blocks immediately.
+
+We recommend having overlapping time ranges with Thanos Sidecar and other Thanos Store gateways as this improves your resiliency to failures.
+A lot of Object Store implementations provide eventual read-after-write consistency, which means that Thanos Store won't immediately see newly created & uploaded blocks.
+Also Thanos Sidecar might fail to upload new blocks to Object Store due to network timeouts or Object Store downtime, so if your time ranges are too strict, you won't be able to query the data.
+
+Thanos Querier deals with overlapping time series by merging them together. It's important to note, that by having more overlapping time series Thanos Querier will use more resources, like CPU, network & memory, as it will have to pull down all the overlapping blocks and merge them.
+
+When configuring time partitioning keep in mind Thanos Compaction, as it builds bigger blocks and removes data based on your retention policies.
diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go
index 20989fd253..4148f5a8dd 100644
--- a/pkg/compact/compact_test.go
+++ b/pkg/compact/compact_test.go
@@ -7,8 +7,6 @@ import (
"testing"
"time"
- "github.com/improbable-eng/thanos/pkg/objstore/inmem"
- "github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
terrors "github.com/prometheus/tsdb/errors"
diff --git a/pkg/model/timeduration_test.go b/pkg/model/timeduration_test.go
index b00a491b5f..d3d3eba271 100644
--- a/pkg/model/timeduration_test.go
+++ b/pkg/model/timeduration_test.go
@@ -4,9 +4,9 @@ import (
"testing"
"time"
- "github.com/improbable-eng/thanos/pkg/model"
- "github.com/improbable-eng/thanos/pkg/testutil"
"github.com/prometheus/prometheus/pkg/timestamp"
+ "github.com/thanos-io/thanos/pkg/model"
+ "github.com/thanos-io/thanos/pkg/testutil"
"gopkg.in/alecthomas/kingpin.v2"
)
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index b1929d13b2..94fa78bb2a 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -18,19 +18,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
- "github.com/improbable-eng/thanos/pkg/block"
- "github.com/improbable-eng/thanos/pkg/block/metadata"
- "github.com/improbable-eng/thanos/pkg/compact/downsample"
- "github.com/improbable-eng/thanos/pkg/component"
- "github.com/improbable-eng/thanos/pkg/extprom"
- "github.com/improbable-eng/thanos/pkg/model"
- "github.com/improbable-eng/thanos/pkg/objstore"
- "github.com/improbable-eng/thanos/pkg/pool"
- "github.com/improbable-eng/thanos/pkg/runutil"
- "github.com/improbable-eng/thanos/pkg/store/storepb"
- "github.com/improbable-eng/thanos/pkg/strutil"
- "github.com/improbable-eng/thanos/pkg/tracing"
->>>>>>> Refactor flag into separate package
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/pkg/errors"
@@ -45,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
+ "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index 3e0e893f3e..7c9721a724 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -10,20 +10,13 @@ import (
"time"
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block"
- "github.com/improbable-eng/thanos/pkg/block/metadata"
- "github.com/improbable-eng/thanos/pkg/objstore"
- "github.com/improbable-eng/thanos/pkg/objstore/objtesting"
- "github.com/improbable-eng/thanos/pkg/runutil"
- storecache "github.com/improbable-eng/thanos/pkg/store/cache"
- "github.com/improbable-eng/thanos/pkg/store/storepb"
- "github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
+ "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/runutil"
@@ -43,7 +36,6 @@ var (
MinBlockEndTime: minTimeDuration,
MaxBlockEndTime: maxTimeDuration,
}
->>>>>>> Add block end time filters
)
type noopCache struct{}
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index 95ff98b8b0..319df4fe92 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -10,12 +10,6 @@ import (
"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block/metadata"
- "github.com/improbable-eng/thanos/pkg/compact/downsample"
- "github.com/improbable-eng/thanos/pkg/model"
- "github.com/improbable-eng/thanos/pkg/objstore/inmem"
- "github.com/improbable-eng/thanos/pkg/store/storepb"
- "github.com/improbable-eng/thanos/pkg/testutil"
"github.com/leanovate/gopter"
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
@@ -25,6 +19,8 @@ import (
"github.com/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
+ "github.com/thanos-io/thanos/pkg/model"
+ "github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)
From e1a2fb255ebc3c6c400bf28e729173be7a675406 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Tue, 13 Aug 2019 10:01:05 +0300
Subject: [PATCH 22/22] Refactor to --min-time & --max-time
---
cmd/thanos/store.go | 33 ++++++++------------------
pkg/store/bucket.go | 56 +++++++++++++++++++++++++++++++--------------
2 files changed, 48 insertions(+), 41 deletions(-)
diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go
index 60971f675a..79e56e964e 100644
--- a/cmd/thanos/store.go
+++ b/cmd/thanos/store.go
@@ -50,30 +50,17 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()
- minBlockStartTime := model.TimeOrDuration(cmd.Flag("min-block-start-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have start time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
- maxBlockStartTime := model.TimeOrDuration(cmd.Flag("max-block-start-time", "End of time range limit to serve. Thanos Store serves only blocks, which have start time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
- Default("9999-12-31T23:59:59Z"))
-
- minBlockEndTime := model.TimeOrDuration(cmd.Flag("min-block-end-time", "Start of time range limit to serve. Thanos Store serves only blocks, which have end time greater than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
- Default("0000-01-01T00:00:00Z"))
-
- maxBlockEndTime := model.TimeOrDuration(cmd.Flag("max-block-end-time", "End of time range limit to serve. Thanos Store serves only blocks, which have end time is less than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1.5d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
+ maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
// Sanity check Time filters
- switch {
- case minBlockStartTime.PrometheusTimestamp() > maxBlockStartTime.PrometheusTimestamp():
- return errors.Errorf("invalid argument: --min-block-start-time '%s' can't be greater than --max-block-start-time '%s'", minBlockStartTime, maxBlockStartTime)
- case minBlockEndTime.PrometheusTimestamp() > maxBlockEndTime.PrometheusTimestamp():
- return errors.Errorf("invalid argument: --min-block-end-time '%s' can't be greater than --max-block-end-time '%s'", minBlockStartTime, maxBlockStartTime)
-
- case minBlockStartTime.PrometheusTimestamp() > maxBlockEndTime.PrometheusTimestamp():
- return errors.Errorf("invalid argument: --min-block-start-time '%s' can't be greater than --max-block-end-time '%s'", minBlockStartTime, maxBlockStartTime)
- case minBlockEndTime.PrometheusTimestamp() > maxBlockStartTime.PrometheusTimestamp():
- return errors.Errorf("invalid argument: --min-block-end-time '%s' can't be greater than --max-block-start-time '%s'", minBlockStartTime, maxBlockStartTime)
+ if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
+ return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
+ minTime, maxTime)
}
return runStore(g,
@@ -95,11 +82,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
debugLogging,
*syncInterval,
*blockSyncConcurrency,
- &store.BlockFilterConfig{
- MinBlockStartTime: *minBlockStartTime,
- MaxBlockStartTime: *maxBlockStartTime,
- MinBlockEndTime: *minBlockEndTime,
- MaxBlockEndTime: *maxBlockEndTime,
+ &store.FilterConfig{
+ MinTime: *minTime,
+ MaxTime: *maxTime,
},
)
}
@@ -126,7 +111,7 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
- blockFilterConf *store.BlockFilterConfig,
+ blockFilterConf *store.FilterConfig,
) error {
{
confContentYaml, err := objStoreConfig.Content()
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index 94fa78bb2a..3efa393619 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -183,10 +183,9 @@ type indexCache interface {
Series(b ulid.ULID, id uint64) ([]byte, bool)
}
-// BlockFilterConfig is a configiration, which Store uses for filtering blocks.
-type BlockFilterConfig struct {
- MinBlockStartTime, MaxBlockStartTime model.TimeOrDurationValue
- MinBlockEndTime, MaxBlockEndTime model.TimeOrDurationValue
+// FilterConfig is a configiration, which Store uses for filtering metrics.
+type FilterConfig struct {
+ MinTime, MaxTime model.TimeOrDurationValue
}
// BucketStore implements the store API backed by a bucket. It loads all index
@@ -216,7 +215,7 @@ type BucketStore struct {
samplesLimiter *Limiter
partitioner partitioner
- blockFilterConf *BlockFilterConfig
+ filterConfig *FilterConfig
}
// NewBucketStore creates a new bucket backed store that implements the store API against
@@ -232,7 +231,7 @@ func NewBucketStore(
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
- blockFilterConf *BlockFilterConfig,
+ filterConf *FilterConfig,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
@@ -265,9 +264,9 @@ func NewBucketStore(
maxConcurrent,
extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg),
),
- samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
- partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
- blockFilterConf: blockFilterConf,
+ samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
+ partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
+ filterConfig: filterConf,
}
if err := os.MkdirAll(dir, 0777); err != nil {
@@ -414,16 +413,10 @@ func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (b
// We check for blocks in configured minTime, maxTime range
switch {
- case b.meta.MinTime < s.blockFilterConf.MinBlockStartTime.PrometheusTimestamp():
+ case b.meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp():
return false, nil
- case b.meta.MinTime > s.blockFilterConf.MaxBlockStartTime.PrometheusTimestamp():
- return false, nil
-
- case b.meta.MaxTime < s.blockFilterConf.MinBlockEndTime.PrometheusTimestamp():
- return false, nil
-
- case b.meta.MaxTime > s.blockFilterConf.MaxBlockEndTime.PrometheusTimestamp():
+ case b.meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp():
return false, nil
}
@@ -522,9 +515,33 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
maxt = b.meta.MaxTime
}
}
+
+ mint = s.normalizeMinTime(mint)
+ maxt = s.normalizeMaxTime(maxt)
+
return mint, maxt
}
+func (s *BucketStore) normalizeMinTime(mint int64) int64 {
+ filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp()
+
+ if mint < filterMinTime {
+ return filterMinTime
+ }
+
+ return mint
+}
+
+func (s *BucketStore) normalizeMaxTime(maxt int64) int64 {
+ filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp()
+
+ if maxt > filterMaxTime {
+ maxt = filterMaxTime
+ }
+
+ return maxt
+}
+
// Info implements the storepb.StoreServer interface.
func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
mint, maxt := s.TimeRange()
@@ -784,11 +801,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
)
s.mtx.RLock()
+ // Adjust Request MinTime based on filters.
+ req.MinTime = s.normalizeMinTime(req.MinTime)
+ req.MaxTime = s.normalizeMaxTime(req.MaxTime)
+
for _, bs := range s.blockSets {
blockMatchers, ok := bs.labelMatchers(matchers...)
if !ok {
continue
}
+
blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow)
if s.debugLogging {