Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matcher cache/series #8045

Merged
merged 9 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var cache = storecache.NewNoopMatcherCache()
var cache = storecache.NoopMatchersCache
if conf.matcherCacheSize > 0 {
cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg))
if err != nil {
Expand Down Expand Up @@ -1058,7 +1058,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"about order.").
Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload)

cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize)
cmd.Flag("matcher-cache-size", "Max number of cached matchers items. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize)

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

Expand Down
13 changes: 13 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type storeConfig struct {
postingGroupMaxKeySeriesRatio float64

indexHeaderLazyDownloadStrategy string

matcherCacheSize int
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -225,6 +227,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&sc.label)

cmd.Flag("matcher-cache-size", "Max number of cached matchers items. Using 0 disables caching.").Default("0").IntVar(&sc.matcherCacheSize)

sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
}

Expand Down Expand Up @@ -368,6 +372,14 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

var matchersCache = storecache.NoopMatchersCache
if conf.matcherCacheSize > 0 {
matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg))
if err != nil {
return errors.Wrap(err, "failed to create matchers cache")
}
}

var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
Expand Down Expand Up @@ -413,6 +425,7 @@ func runStore(
}),
store.WithRegistry(reg),
store.WithIndexCache(indexCache),
store.WithMatchersCache(matchersCache),
store.WithQueryGate(queriesGate),
store.WithChunkPool(chunkPool),
store.WithFilterConfig(conf.filterConf),
Expand Down
4 changes: 2 additions & 2 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ Flags:
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
--matcher-cache-size=0 The size of the cache used for matching against
external labels. Using 0 disables caching.
--matcher-cache-size=0 Max number of cached matchers items. Using 0
disables caching.
--objstore.config=<content>
Alternative to 'objstore.config-file'
flag (mutually exclusive). Content of
Expand Down
2 changes: 2 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ Flags:
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
--matcher-cache-size=0 Max number of cached matchers items. Using 0
disables caching.
--max-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
will serve only blocks, which happened earlier
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
matcherCache: storecache.NewNoopMatcherCache(),
matcherCache: storecache.NoopMatchersCache,
}

for _, option := range options {
Expand Down
21 changes: 15 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ type BucketStore struct {
fetcher block.MetadataFetcher
dir string
indexCache storecache.IndexCache
matcherCache storecache.MatchersCache
indexReaderPool *indexheader.ReaderPool
buffers sync.Pool
chunkPool pool.Pool[byte]
Expand Down Expand Up @@ -517,6 +518,13 @@ func WithIndexCache(cache storecache.IndexCache) BucketStoreOption {
}
}

// WithMatchersCache sets a matchers cache to use instead of a noopCache.
func WithMatchersCache(cache storecache.MatchersCache) BucketStoreOption {
return func(s *BucketStore) {
s.matcherCache = cache
}
}

// WithQueryGate sets a queryGate to use instead of a noopGate.
func WithQueryGate(queryGate gate.Gate) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -637,11 +645,12 @@ func NewBucketStore(
options ...BucketStoreOption,
) (*BucketStore, error) {
s := &BucketStore{
logger: log.NewNopLogger(),
bkt: bkt,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
logger: log.NewNopLogger(),
bkt: bkt,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
matcherCache: storecache.NoopMatchersCache,
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
return &b
Expand Down Expand Up @@ -1536,7 +1545,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store

tenant, _ := tenancy.GetTenantFromGRPCMetadata(srv.Context())

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
matchers, err := storecache.MatchersToPromMatchersCached(s.matcherCache, req.Matchers...)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,8 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
MaxSize: 8889,
})
testutil.Ok(t, err)
matcherCache, err := storecache.NewMatchersCache(storecache.WithSize(10000))
testutil.Ok(t, err)

var b1 *bucketBlock

Expand Down Expand Up @@ -1775,6 +1777,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
store := &BucketStore{
bkt: objstore.WithNoopInstr(bkt),
logger: logger,
matcherCache: matcherCache,
indexCache: indexCache,
indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.AlwaysEagerDownloadIndexHeader),
metrics: newBucketStoreMetrics(nil),
Expand Down Expand Up @@ -2080,6 +2083,9 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

matcherCache, err := storecache.NewMatchersCache(storecache.WithSize(1024))
testutil.Ok(tb, err)

store, err := NewBucketStore(
instrBkt,
fetcher,
Expand All @@ -2096,6 +2102,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
0,
WithLogger(logger),
WithIndexCache(indexCache),
WithMatchersCache(matcherCache),
)
testutil.Ok(tb, err)
testutil.Ok(tb, store.SyncBlocks(context.Background()))
Expand Down
74 changes: 60 additions & 14 deletions pkg/store/cache/matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package storecache

import (
"strings"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -17,28 +19,37 @@ const DefaultCacheSize = 200

type NewItemFunc func() (*labels.Matcher, error)

type ConversionLabelMatcher interface {
GetValue() string
GetName() string
MatcherType() (labels.MatchType, error)
}

type MatchersCache interface {
// GetOrSet retrieves a matcher from cache or creates and stores it if not present.
// If the matcher is not in cache, it uses the provided newItem function to create it.
GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error)
GetOrSet(m ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error)
}

// Ensure implementations satisfy the interface.
var (
_ MatchersCache = (*LruMatchersCache)(nil)
_ MatchersCache = (*NoopMatcherCache)(nil)
)
_ MatchersCache = (*LruMatchersCache)(nil)
NoopMatchersCache MatchersCache = &noopMatcherCache{}

// NoopMatcherCache is a no-op implementation of MatchersCache that doesn't cache anything.
type NoopMatcherCache struct{}
defaultIsCacheableFunc = func(m ConversionLabelMatcher) bool {
t, err := m.MatcherType()
if err != nil {
return false
}

// NewNoopMatcherCache creates a new no-op matcher cache.
func NewNoopMatcherCache() MatchersCache {
return &NoopMatcherCache{}
}
return t == labels.MatchRegexp || t == labels.MatchNotRegexp
}
)

type noopMatcherCache struct{}

// GetOrSet implements MatchersCache by always creating a new matcher without caching.
func (n *NoopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) {
func (n *noopMatcherCache) GetOrSet(_ ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem()
}

Expand All @@ -49,6 +60,8 @@ type LruMatchersCache struct {
metrics *matcherCacheMetrics
size int
sf singleflight.Group

isCacheable func(matcher ConversionLabelMatcher) bool
}

type MatcherCacheOption func(*LruMatchersCache)
Expand All @@ -65,15 +78,24 @@ func WithSize(size int) MatcherCacheOption {
}
}

// WithIsCacheableFunc sets the function that determines if the item should be cached or not.
func WithIsCacheableFunc(f func(matcher ConversionLabelMatcher) bool) MatcherCacheOption {
return func(c *LruMatchersCache) {
c.isCacheable = f
}
}

func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
cache := &LruMatchersCache{
size: DefaultCacheSize,
size: DefaultCacheSize,
isCacheable: defaultIsCacheableFunc,
}

for _, opt := range opts {
opt(cache)
}
cache.metrics = newMatcherCacheMetrics(cache.reg)
cache.metrics.maxItems.Set(float64(cache.size))

lruCache, err := lru.NewWithEvict[string, *labels.Matcher](cache.size, cache.onEvict)
if err != nil {
Expand All @@ -84,8 +106,18 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
return cache, nil
}

func (c *LruMatchersCache) GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error) {
func (c *LruMatchersCache) GetOrSet(m ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
if !c.isCacheable(m) {
return newItem()
}

c.metrics.requestsTotal.Inc()
key, err := cacheKey(m)

if err != nil {
return nil, err
}

v, err, _ := c.sf.Do(key, func() (interface{}, error) {
if item, ok := c.cache.Get(key); ok {
c.metrics.hitsTotal.Inc()
Expand Down Expand Up @@ -151,11 +183,25 @@ func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics {
func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for i := range ms {
pm, err := cache.GetOrSet(ms[i].String(), func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(ms[i]) })
pm, err := cache.GetOrSet(&ms[i], func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(ms[i]) })
if err != nil {
return nil, err
}
res = append(res, pm)
}
return res, nil
}

func cacheKey(m ConversionLabelMatcher) (string, error) {
sb := strings.Builder{}
t, err := m.MatcherType()
if err != nil {
return "", err
}
typeStr := t.String()
sb.Grow(len(m.GetValue()) + len(m.GetName()) + len(typeStr))
sb.WriteString(m.GetName())
sb.WriteString(typeStr)
sb.WriteString(m.GetValue())
return sb.String(), nil
}
Loading
Loading