From 1af150bd651f07fc32ee177b8028c7b352a075ca Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 17:44:24 -0800 Subject: [PATCH 1/9] Add option to cache matcher on the get series call Signed-off-by: alanprot --- cmd/thanos/receive.go | 2 +- pkg/receive/multitsdb.go | 2 +- pkg/store/bucket.go | 21 +++++++++++++++------ pkg/store/bucket_test.go | 4 ++++ pkg/store/cache/matchers_cache.go | 15 +++++++-------- pkg/store/local.go | 2 +- pkg/store/prometheus.go | 6 +++--- pkg/store/proxy.go | 2 +- pkg/store/proxy_test.go | 6 +++--- pkg/store/tsdb.go | 2 +- 10 files changed, 37 insertions(+), 25 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 541d5371a8..559d0af7c5 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -225,7 +225,7 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } - var cache = storecache.NewNoopMatcherCache() + var cache = storecache.NoopMatchersCache if conf.matcherCacheSize > 0 { cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg)) if err != nil { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 96e4e43345..8b89e0c1a1 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -134,7 +134,7 @@ func NewMultiTSDB( bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, hashFunc: hashFunc, - matcherCache: storecache.NewNoopMatcherCache(), + matcherCache: storecache.NoopMatchersCache, } for _, option := range options { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index efc2a4b392..16e9e8c39d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -400,6 +400,7 @@ type BucketStore struct { fetcher block.MetadataFetcher dir string indexCache storecache.IndexCache + matcherCache storecache.MatchersCache indexReaderPool *indexheader.ReaderPool buffers sync.Pool chunkPool pool.Pool[byte] @@ -517,6 +518,13 @@ func WithIndexCache(cache storecache.IndexCache) BucketStoreOption { } } +// WithMatchersCache sets a matchers cache to use instead of a noopCache. +func WithMatchersCache(cache storecache.MatchersCache) BucketStoreOption { + return func(s *BucketStore) { + s.matcherCache = cache + } +} + // WithQueryGate sets a queryGate to use instead of a noopGate. func WithQueryGate(queryGate gate.Gate) BucketStoreOption { return func(s *BucketStore) { @@ -637,11 +645,12 @@ func NewBucketStore( options ...BucketStoreOption, ) (*BucketStore, error) { s := &BucketStore{ - logger: log.NewNopLogger(), - bkt: bkt, - fetcher: fetcher, - dir: dir, - indexCache: noopCache{}, + logger: log.NewNopLogger(), + bkt: bkt, + fetcher: fetcher, + dir: dir, + indexCache: noopCache{}, + matcherCache: storecache.NoopMatchersCache, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b @@ -1536,7 +1545,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store tenant, _ := tenancy.GetTenantFromGRPCMetadata(srv.Context()) - matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) + matchers, err := storecache.MatchersToPromMatchersCached(s.matcherCache, req.Matchers...) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 0fe173efa4..2ba4544ef0 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2080,6 +2080,9 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{}) testutil.Ok(tb, err) + matcherCache, err := storecache.NewMatchersCache(storecache.WithSize(1024)) + testutil.Ok(tb, err) + store, err := NewBucketStore( instrBkt, fetcher, @@ -2096,6 +2099,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { 0, WithLogger(logger), WithIndexCache(indexCache), + WithMatchersCache(matcherCache), ) testutil.Ok(tb, err) testutil.Ok(tb, store.SyncBlocks(context.Background())) diff --git a/pkg/store/cache/matchers_cache.go b/pkg/store/cache/matchers_cache.go index 02aa06ca65..8bdee29aab 100644 --- a/pkg/store/cache/matchers_cache.go +++ b/pkg/store/cache/matchers_cache.go @@ -25,20 +25,18 @@ type MatchersCache interface { // Ensure implementations satisfy the interface. var ( - _ MatchersCache = (*LruMatchersCache)(nil) - _ MatchersCache = (*NoopMatcherCache)(nil) + _ MatchersCache = (*LruMatchersCache)(nil) + NoopMatchersCache MatchersCache = &noopMatcherCache{} ) -// NoopMatcherCache is a no-op implementation of MatchersCache that doesn't cache anything. -type NoopMatcherCache struct{} +type noopMatcherCache struct{} -// NewNoopMatcherCache creates a new no-op matcher cache. -func NewNoopMatcherCache() MatchersCache { - return &NoopMatcherCache{} +func newNoopMatcherCache() MatchersCache { + return &noopMatcherCache{} } // GetOrSet implements MatchersCache by always creating a new matcher without caching. -func (n *NoopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) { +func (n *noopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) { return newItem() } @@ -74,6 +72,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) { opt(cache) } cache.metrics = newMatcherCacheMetrics(cache.reg) + cache.metrics.maxItems.Set(float64(cache.size)) lruCache, err := lru.NewWithEvict[string, *labels.Matcher](cache.size, cache.onEvict) if err != nil { diff --git a/pkg/store/local.go b/pkg/store/local.go index 5d72ee28af..a397b49cfb 100644 --- a/pkg/store/local.go +++ b/pkg/store/local.go @@ -131,7 +131,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storecache.NewNoopMatcherCache()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storecache.NoopMatchersCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index a503d15689..3c30d70e8e 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -126,7 +126,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NoopMatchersCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -543,7 +543,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NoopMatchersCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -606,7 +606,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NoopMatchersCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index af1ba9dae1..4ee987df29 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -177,7 +177,7 @@ func NewProxyStore( retrievalStrategy: retrievalStrategy, tsdbSelector: DefaultSelector, enableDedup: true, - matcherCache: storecache.NewNoopMatcherCache(), + matcherCache: storecache.NoopMatchersCache, } for _, option := range options { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index a9db1d11a0..dd0ac551ff 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -2175,7 +2175,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { responseTimeout: 5 * time.Second, retrievalStrategy: EagerRetrieval, tsdbSelector: DefaultSelector, - matcherCache: storecache.NewNoopMatcherCache(), + matcherCache: storecache.NoopMatchersCache, } var allResps []*storepb.SeriesResponse @@ -2312,7 +2312,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, - matcherCache: storecache.NewNoopMatcherCache(), + matcherCache: storecache.NoopMatchersCache, } ctx, cancel := context.WithCancel(context.Background()) @@ -2350,7 +2350,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, - matcherCache: storecache.NewNoopMatcherCache(), + matcherCache: storecache.NoopMatchersCache, } ctx := context.Background() diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index a62481a53f..d1d88cb92c 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -120,7 +120,7 @@ func NewTSDBStore( b := make([]byte, 0, initialBufSize) return &b }}, - matcherCache: storecache.NewNoopMatcherCache(), + matcherCache: storecache.NoopMatchersCache, } for _, option := range options { From cccb0452b4874228140fc2d49e3c8090699434ab Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 17:52:07 -0800 Subject: [PATCH 2/9] Adding matcher cacheoption for the store gateway Signed-off-by: alanprot --- cmd/thanos/store.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 1cdc12679c..71a33d5520 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -104,6 +104,8 @@ type storeConfig struct { postingGroupMaxKeySeriesRatio float64 indexHeaderLazyDownloadStrategy string + + matcherCacheSize int } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -225,6 +227,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&sc.label) + cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&sc.matcherCacheSize) + sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) } @@ -368,6 +372,14 @@ func runStore( return errors.Wrap(err, "create index cache") } + var matchersCache = storecache.NoopMatchersCache + if conf.matcherCacheSize > 0 { + matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg)) + if err != nil { + return errors.Wrap(err, "failed to create matchers cache") + } + } + var blockLister block.Lister switch syncStrategy(conf.blockListStrategy) { case concurrentDiscovery: @@ -413,6 +425,7 @@ func runStore( }), store.WithRegistry(reg), store.WithIndexCache(indexCache), + store.WithMatchersCache(matchersCache), store.WithQueryGate(queriesGate), store.WithChunkPool(chunkPool), store.WithFilterConfig(conf.filterConf), From 3e2e473e0b7244477e3aa2a90876f0d0cf39b999 Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 17:59:11 -0800 Subject: [PATCH 3/9] lint/docs Signed-off-by: alanprot --- docs/components/store.md | 2 ++ pkg/store/cache/matchers_cache.go | 4 ---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/components/store.md b/docs/components/store.md index ce2adb6d6d..99f534ecaa 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -163,6 +163,8 @@ Flags: --log.format=logfmt Log format to use. Possible options: logfmt or json. --log.level=info Log filtering level. + --matcher-cache-size=0 The size of the cache used for matching against + external labels. Using 0 disables caching. --max-time=9999-12-31T23:59:59Z End of time range limit to serve. Thanos Store will serve only blocks, which happened earlier diff --git a/pkg/store/cache/matchers_cache.go b/pkg/store/cache/matchers_cache.go index 8bdee29aab..aefcf7e70c 100644 --- a/pkg/store/cache/matchers_cache.go +++ b/pkg/store/cache/matchers_cache.go @@ -31,10 +31,6 @@ var ( type noopMatcherCache struct{} -func newNoopMatcherCache() MatchersCache { - return &noopMatcherCache{} -} - // GetOrSet implements MatchersCache by always creating a new matcher without caching. func (n *noopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) { return newItem() From 362b3ce20fb6bbcf1101805e7cf44edbbc0e9745 Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 18:08:00 -0800 Subject: [PATCH 4/9] change desc Signed-off-by: alanprot --- cmd/thanos/receive.go | 2 +- cmd/thanos/store.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 559d0af7c5..7bcb368510 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -1058,7 +1058,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { "about order."). Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload) - cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize) + cmd.Flag("matcher-cache-size", "The size of the cache used for caching matchers. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize) rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 71a33d5520..b88d6001b4 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -227,7 +227,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&sc.label) - cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&sc.matcherCacheSize) + cmd.Flag("matcher-cache-size", "The size of the cache used for caching matchers. Using 0 disables caching.").Default("0").IntVar(&sc.matcherCacheSize) sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) } From 008bacb518ec6e27dd77aa42d91b63a83a031138 Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 18:12:04 -0800 Subject: [PATCH 5/9] change desc Signed-off-by: alanprot --- cmd/thanos/receive.go | 2 +- cmd/thanos/store.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 7bcb368510..d29ab94728 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -1058,7 +1058,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { "about order."). Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload) - cmd.Flag("matcher-cache-size", "The size of the cache used for caching matchers. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize) + cmd.Flag("matcher-cache-size", "Max number of cached matchers items. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize) rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index b88d6001b4..e3891215eb 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -227,7 +227,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&sc.label) - cmd.Flag("matcher-cache-size", "The size of the cache used for caching matchers. Using 0 disables caching.").Default("0").IntVar(&sc.matcherCacheSize) + cmd.Flag("matcher-cache-size", "Max number of cached matchers items. Using 0 disables caching.").Default("0").IntVar(&sc.matcherCacheSize) sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) } From 442b04a31f6ded76824c7dbac5b77fff82fa87af Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 18:14:33 -0800 Subject: [PATCH 6/9] change desc Signed-off-by: alanprot --- docs/components/receive.md | 4 ++-- docs/components/store.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/components/receive.md b/docs/components/receive.md index 73ec0bbf61..be9332ee46 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -407,8 +407,8 @@ Flags: --log.format=logfmt Log format to use. Possible options: logfmt or json. --log.level=info Log filtering level. - --matcher-cache-size=0 The size of the cache used for matching against - external labels. Using 0 disables caching. + --matcher-cache-size=0 Max number of cached matchers items. Using 0 + disables caching. --objstore.config= Alternative to 'objstore.config-file' flag (mutually exclusive). Content of diff --git a/docs/components/store.md b/docs/components/store.md index 99f534ecaa..6be651dced 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -163,8 +163,8 @@ Flags: --log.format=logfmt Log format to use. Possible options: logfmt or json. --log.level=info Log filtering level. - --matcher-cache-size=0 The size of the cache used for matching against - external labels. Using 0 disables caching. + --matcher-cache-size=0 Max number of cached matchers items. Using 0 + disables caching. --max-time=9999-12-31T23:59:59Z End of time range limit to serve. Thanos Store will serve only blocks, which happened earlier From dc929f5fb95e12b077c95694a68b80ece8eb717b Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 7 Jan 2025 09:59:25 -0800 Subject: [PATCH 7/9] fix test Signed-off-by: alanprot --- pkg/store/bucket_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 2ba4544ef0..3938d06357 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1682,6 +1682,8 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { MaxSize: 8889, }) testutil.Ok(t, err) + matcherCache, err := storecache.NewMatchersCache(storecache.WithSize(10000)) + testutil.Ok(t, err) var b1 *bucketBlock @@ -1775,6 +1777,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { store := &BucketStore{ bkt: objstore.WithNoopInstr(bkt), logger: logger, + matcherCache: matcherCache, indexCache: indexCache, indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.AlwaysEagerDownloadIndexHeader), metrics: newBucketStoreMetrics(nil), From 602eda97459a2f2156363df49520573b79e2475f Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 7 Jan 2025 11:32:58 -0800 Subject: [PATCH 8/9] Caching only regex matchers Signed-off-by: alanprot --- pkg/store/cache/matchers_cache.go | 61 ++++++++- pkg/store/cache/matchers_cache_test.go | 165 ++++++++++++++----------- pkg/store/storepb/custom.go | 18 +++ 3 files changed, 166 insertions(+), 78 deletions(-) diff --git a/pkg/store/cache/matchers_cache.go b/pkg/store/cache/matchers_cache.go index aefcf7e70c..e3b18603a6 100644 --- a/pkg/store/cache/matchers_cache.go +++ b/pkg/store/cache/matchers_cache.go @@ -4,6 +4,8 @@ package storecache import ( + "strings" + lru "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -17,22 +19,37 @@ const DefaultCacheSize = 200 type NewItemFunc func() (*labels.Matcher, error) +type ConversionLabelMatcher interface { + GetValue() string + GetName() string + MatcherType() (labels.MatchType, error) +} + type MatchersCache interface { // GetOrSet retrieves a matcher from cache or creates and stores it if not present. // If the matcher is not in cache, it uses the provided newItem function to create it. - GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error) + GetOrSet(m ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) } // Ensure implementations satisfy the interface. var ( _ MatchersCache = (*LruMatchersCache)(nil) NoopMatchersCache MatchersCache = &noopMatcherCache{} + + defaultIsCacheableFunc = func(m ConversionLabelMatcher) bool { + t, err := m.MatcherType() + if err != nil { + return false + } + + return t == labels.MatchRegexp || t == labels.MatchNotRegexp + } ) type noopMatcherCache struct{} // GetOrSet implements MatchersCache by always creating a new matcher without caching. -func (n *noopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) { +func (n *noopMatcherCache) GetOrSet(_ ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { return newItem() } @@ -43,6 +60,8 @@ type LruMatchersCache struct { metrics *matcherCacheMetrics size int sf singleflight.Group + + isCacheable func(matcher ConversionLabelMatcher) bool } type MatcherCacheOption func(*LruMatchersCache) @@ -59,9 +78,17 @@ func WithSize(size int) MatcherCacheOption { } } +// WithIsCacheableFunc sets the function that determines if the item should be cached or not. +func WithIsCacheableFunc(f func(matcher ConversionLabelMatcher) bool) MatcherCacheOption { + return func(c *LruMatchersCache) { + c.isCacheable = f + } +} + func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) { cache := &LruMatchersCache{ - size: DefaultCacheSize, + size: DefaultCacheSize, + isCacheable: defaultIsCacheableFunc, } for _, opt := range opts { @@ -79,8 +106,18 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) { return cache, nil } -func (c *LruMatchersCache) GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error) { +func (c *LruMatchersCache) GetOrSet(m ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { + if !c.isCacheable(m) { + return newItem() + } + c.metrics.requestsTotal.Inc() + key, err := cacheKey(m) + + if err != nil { + return nil, err + } + v, err, _ := c.sf.Do(key, func() (interface{}, error) { if item, ok := c.cache.Get(key); ok { c.metrics.hitsTotal.Inc() @@ -146,7 +183,7 @@ func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics { func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) { res := make([]*labels.Matcher, 0, len(ms)) for i := range ms { - pm, err := cache.GetOrSet(ms[i].String(), func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(ms[i]) }) + pm, err := cache.GetOrSet(&ms[i], func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(ms[i]) }) if err != nil { return nil, err } @@ -154,3 +191,17 @@ func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatche } return res, nil } + +func cacheKey(m ConversionLabelMatcher) (string, error) { + sb := strings.Builder{} + t, err := m.MatcherType() + if err != nil { + return "", err + } + typeStr := t.String() + sb.Grow(len(m.GetValue()) + len(m.GetName()) + len(typeStr)) + sb.WriteString(m.GetName()) + sb.WriteString(typeStr) + sb.WriteString(m.GetValue()) + return sb.String(), nil +} diff --git a/pkg/store/cache/matchers_cache_test.go b/pkg/store/cache/matchers_cache_test.go index f07ed3b0c3..f90da976e1 100644 --- a/pkg/store/cache/matchers_cache_test.go +++ b/pkg/store/cache/matchers_cache_test.go @@ -1,7 +1,7 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. -package storecache_test +package storecache import ( "testing" @@ -9,87 +9,106 @@ import ( "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" - storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" ) func TestMatchersCache(t *testing.T) { - cache, err := storecache.NewMatchersCache(storecache.WithSize(2)) - testutil.Ok(t, err) - - matcher := &storepb.LabelMatcher{ - Type: storepb.LabelMatcher_EQ, - Name: "key", - Value: "val", - } - - matcher2 := &storepb.LabelMatcher{ - Type: storepb.LabelMatcher_RE, - Name: "key2", - Value: "val2|val3", - } - - matcher3 := &storepb.LabelMatcher{ - Type: storepb.LabelMatcher_EQ, - Name: "key3", - Value: "val3", + testCases := map[string]struct { + isCacheable func(matcher ConversionLabelMatcher) bool + }{ + "default": { + isCacheable: defaultIsCacheableFunc, + }, + "cache all items": { + isCacheable: func(matcher ConversionLabelMatcher) bool { + return true + }, + }, } - var cacheHit bool - newItem := func(matcher *storepb.LabelMatcher) func() (*labels.Matcher, error) { - return func() (*labels.Matcher, error) { - cacheHit = false - return storepb.MatcherToPromMatcher(*matcher) - } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + cache, err := NewMatchersCache( + WithSize(2), + WithIsCacheableFunc(tc.isCacheable), + ) + testutil.Ok(t, err) + + matcher := &storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key", + Value: "val", + } + + matcher2 := &storepb.LabelMatcher{ + Type: storepb.LabelMatcher_RE, + Name: "key2", + Value: "val2|val3", + } + + matcher3 := &storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key3", + Value: "val3", + } + + var cacheHit bool + newItem := func(matcher *storepb.LabelMatcher) func() (*labels.Matcher, error) { + return func() (*labels.Matcher, error) { + cacheHit = false + return storepb.MatcherToPromMatcher(*matcher) + } + } + expected := labels.MustNewMatcher(labels.MatchEqual, "key", "val") + expected2 := labels.MustNewMatcher(labels.MatchRegexp, "key2", "val2|val3") + expected3 := labels.MustNewMatcher(labels.MatchEqual, "key3", "val3") + + item, err := cache.GetOrSet(matcher, newItem(matcher)) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem(matcher)) + testutil.Ok(t, err) + testutil.Equals(t, tc.isCacheable(matcher), cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem(matcher2)) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem(matcher2)) + testutil.Ok(t, err) + testutil.Equals(t, tc.isCacheable(matcher2), cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem(matcher)) + testutil.Ok(t, err) + testutil.Equals(t, tc.isCacheable(matcher), cacheHit) + testutil.Equals(t, expected, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher3, newItem(matcher3)) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected3, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem(matcher2)) + testutil.Ok(t, err) + testutil.Equals(t, tc.isCacheable(matcher2) && cache.cache.Len() < 2, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + }) } - expected := labels.MustNewMatcher(labels.MatchEqual, "key", "val") - expected2 := labels.MustNewMatcher(labels.MatchRegexp, "key2", "val2|val3") - expected3 := labels.MustNewMatcher(labels.MatchEqual, "key3", "val3") - - item, err := cache.GetOrSet(matcher.String(), newItem(matcher)) - testutil.Ok(t, err) - testutil.Equals(t, false, cacheHit) - testutil.Equals(t, expected.String(), item.String()) - - cacheHit = true - item, err = cache.GetOrSet(matcher.String(), newItem(matcher)) - testutil.Ok(t, err) - testutil.Equals(t, true, cacheHit) - testutil.Equals(t, expected.String(), item.String()) - - cacheHit = true - item, err = cache.GetOrSet(matcher2.String(), newItem(matcher2)) - testutil.Ok(t, err) - testutil.Equals(t, false, cacheHit) - testutil.Equals(t, expected2.String(), item.String()) - - cacheHit = true - item, err = cache.GetOrSet(matcher2.String(), newItem(matcher2)) - testutil.Ok(t, err) - testutil.Equals(t, true, cacheHit) - testutil.Equals(t, expected2.String(), item.String()) - - cacheHit = true - item, err = cache.GetOrSet(matcher.String(), newItem(matcher)) - testutil.Ok(t, err) - testutil.Equals(t, true, cacheHit) - testutil.Equals(t, expected, item) - - cacheHit = true - item, err = cache.GetOrSet(matcher3.String(), newItem(matcher3)) - testutil.Ok(t, err) - testutil.Equals(t, false, cacheHit) - testutil.Equals(t, expected3, item) - - cacheHit = true - item, err = cache.GetOrSet(matcher2.String(), newItem(matcher2)) - testutil.Ok(t, err) - testutil.Equals(t, false, cacheHit) - testutil.Equals(t, expected2.String(), item.String()) } func BenchmarkMatchersCache(b *testing.B) { - cache, err := storecache.NewMatchersCache(storecache.WithSize(100)) + cache, err := NewMatchersCache(WithSize(100)) if err != nil { b.Fatalf("failed to create cache: %v", err) } @@ -106,7 +125,7 @@ func BenchmarkMatchersCache(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { matcher := matchers[i%len(matchers)] - _, err := cache.GetOrSet(matcher.String(), func() (*labels.Matcher, error) { + _, err := cache.GetOrSet(matcher, func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(*matcher) }) if err != nil { diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index b165d76fcc..7a7d76afcb 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -552,3 +552,21 @@ func (c *SeriesStatsCounter) Count(series *Series) { func (m *SeriesRequest) ToPromQL() string { return m.QueryHints.toPromQL(m.Matchers) } + +func (m *LabelMatcher) MatcherType() (labels.MatchType, error) { + var t labels.MatchType + switch m.Type { + case LabelMatcher_EQ: + t = labels.MatchEqual + case LabelMatcher_NEQ: + t = labels.MatchNotEqual + case LabelMatcher_RE: + t = labels.MatchRegexp + case LabelMatcher_NRE: + t = labels.MatchNotRegexp + default: + return 0, errors.Errorf("unrecognized label matcher type %d", m.Type) + } + + return t, nil +} From 2da263126ca2d61ebaef72bcded2fb673873a7cf Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 7 Jan 2025 13:53:57 -0800 Subject: [PATCH 9/9] changelog Signed-off-by: alanprot --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a65d6beb97..a9b7990e3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers. - [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons. - [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options -- [#7353](https://github.com/thanos-io/thanos/pull/7353) Receiver: introduce optional cache for matchers in series calls. +- [#7353](https://github.com/thanos-io/thanos/pull/7353) [#8045](https://github.com/thanos-io/thanos/pull/8045) Receiver/StoreGateway: Add `--matcher-cache-size` option to enable caching for regex matchers in series calls. - [#8017](https://github.com/thanos-io/thanos/pull/8017) Store Gateway: Use native histogram for binary reader load and download duration and fixed download duration metric. #8017 ### Changed