Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive/multitsdb: add cuckoo filter on metric names #7787

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ import (
"github.com/thanos-io/thanos/pkg/tls"
)

const compressionNone = "none"
const (
compressionNone = "none"
metricNamesFilter = "metric-names-filter"
)

func registerReceive(app *extkingpin.App) {
cmd := app.Command(component.Receive.String(), "Accept Prometheus remote write API requests and write to local tsdb.")
Expand Down Expand Up @@ -136,6 +139,14 @@ func runReceive(

level.Info(logger).Log("mode", receiveMode, "msg", "running receive")

multiTSDBOptions := []receive.MultiTSDBOption{}
for _, feature := range *conf.featureList {
if feature == metricNamesFilter {
multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled())
level.Info(logger).Log("msg", "metric name filter feature enabled")
}
}

rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA)
if err != nil {
return err
Expand Down Expand Up @@ -215,6 +226,7 @@ func runReceive(
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
multiTSDBOptions...,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{
Intern: conf.writerInterning,
Expand Down Expand Up @@ -845,6 +857,8 @@ type receiveConfig struct {
limitsConfigReloadTimer time.Duration

asyncForwardWorkerCount uint

featureList *[]string
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -985,6 +999,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
cmd.Flag("receive.limits-config-reload-timer", "Minimum amount of time to pass for the limit configuration to be reloaded. Helps to avoid excessive reloads.").
Default("1s").Hidden().DurationVar(&rc.limitsConfigReloadTimer)

rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
3 changes: 3 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ Flags:
detected maximum container or system memory.
--enable-auto-gomemlimit Enable go runtime to automatically limit memory
consumption.
--enable-feature= ... Comma separated experimental feature names
to enable. The current list of features is
metric-names-filter.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,14 @@ require (
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.34.2
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771
go.opentelemetry.io/contrib/propagators/autoprop v0.54.0
go4.org/intern v0.0.0-20230525184215-6c62f75575cb
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
)

require github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect

require (
cloud.google.com/go/auth v0.5.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/digitalocean/godo v1.117.0 h1:WVlTe09melDYTd7VCVyvHcNWbgB+uI1O115+5LOtdSw=
github.com/digitalocean/godo v1.117.0/go.mod h1:Vk0vpCot2HOAJwc5WE8wljZGtJ3ZtWIc8MQ8rF38sdo=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
Expand Down Expand Up @@ -2175,6 +2177,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybL
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27 h1:yGAraK1uUjlhSXgNMIy8o/J4LFNcy7yeipBqt9N9mVg=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY=
github.com/sercand/kuberesolver/v5 v5.1.1/go.mod h1:Fs1KbKhVRnB2aDWN12NjKCB+RgYMWZJ294T3BtmVCpQ=
github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw=
Expand Down Expand Up @@ -3206,6 +3210,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
Expand Down
45 changes: 45 additions & 0 deletions pkg/filter/cuckoo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package filter

import (
"sync"
"unsafe"

"github.com/prometheus/prometheus/model/labels"
cuckoo "github.com/seiflotfy/cuckoofilter"
)

type CuckooMetricNameStoreFilter struct {
filter *cuckoo.Filter
mtx sync.RWMutex
}

func NewCuckooMetricNameStoreFilter(capacity uint) *CuckooMetricNameStoreFilter {
return &CuckooMetricNameStoreFilter{
filter: cuckoo.NewFilter(capacity),
}
}

func (f *CuckooMetricNameStoreFilter) Matches(matchers []*labels.Matcher) bool {
f.mtx.RLock()
defer f.mtx.RUnlock()

for _, m := range matchers {
if m.Type == labels.MatchEqual && m.Name == labels.MetricName {
return f.filter.Lookup([]byte(m.Value))
}
}

return true
}

func (f *CuckooMetricNameStoreFilter) ResetAndSet(values ...string) {
f.mtx.Lock()
defer f.mtx.Unlock()
f.filter.Reset()
for _, value := range values {
f.filter.Insert(unsafe.Slice(unsafe.StringData(value), len(value)))
}
}
22 changes: 22 additions & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package filter

import "github.com/prometheus/prometheus/model/labels"

type StoreFilter interface {
// Matches returns true if the filter matches the given matchers.
Matches(matchers []*labels.Matcher) bool

// ResetAndSet resets the filter and sets it to the given values.
ResetAndSet(values ...string)
}

type AllowAllStoreFilter struct{}

func (f AllowAllStoreFilter) Matches(matchers []*labels.Matcher) bool {
return true
}

func (f AllowAllStoreFilter) ResetAndSet(values ...string) {}
4 changes: 4 additions & 0 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,10 @@ func (er *endpointRef) apisPresent() []string {
return apisPresent
}

func (er *endpointRef) Matches(matchers []*labels.Matcher) bool {
return true
}

type endpointMetadata struct {
*infopb.InfoResponse
}
Expand Down
34 changes: 32 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ type MultiTSDB struct {

exemplarClients map[string]*exemplars.TSDB
exemplarClientsNeedUpdate bool

metricNameFilterEnabled bool
}

// MultiTSDBOption is a functional option for MultiTSDB.
type MultiTSDBOption func(mt *MultiTSDB)

// WithMetricNameFilterEnabled enables metric name filtering on TSDB clients.
func WithMetricNameFilterEnabled() MultiTSDBOption {
return func(s *MultiTSDB) {
s.metricNameFilterEnabled = true
}
}

// NewMultiTSDB creates new MultiTSDB.
Expand All @@ -84,12 +96,13 @@ func NewMultiTSDB(
bucket objstore.Bucket,
allowOutOfOrderUpload bool,
hashFunc metadata.HashFunc,
options ...MultiTSDBOption,
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
}

return &MultiTSDB{
mt := &MultiTSDB{
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
Expand All @@ -104,6 +117,12 @@ func NewMultiTSDB(
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
}

for _, option := range options {
option(mt)
}

return mt
}

type localClient struct {
Expand Down Expand Up @@ -179,6 +198,10 @@ func newLocalClient(store *store.TSDBStore) *localClient {
}
}

func (l *localClient) Matches(matchers []*labels.Matcher) bool {
return l.store.Matches(matchers)
}

func (l *localClient) LabelSets() []labels.Labels {
return labelpb.ZLabelSetsToPromLabelSets(l.store.LabelSet()...)
}
Expand Down Expand Up @@ -302,6 +325,9 @@ func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *ship
}

func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) {
if storeTSDB == nil && t.storeTSDB != nil {
t.storeTSDB.Close()
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
}
t.storeTSDB = storeTSDB
t.ship = ship
t.exemplarsTSDB = exemplarsTSDB
Expand Down Expand Up @@ -751,7 +777,11 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
shipper.DefaultMetaFilename,
)
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset))
options := []store.TSDBStoreOption{}
if t.metricNameFilterEnabled {
options = append(options, store.WithCuckooMetricNameStoreFilter())
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Client interface {
// Addr returns address of the store client. If second parameter is true, the client
// represents a local client (server-as-client) and has no remote address.
Addr() (addr string, isLocalClient bool)

// Matches returns true if provided label matchers are allowed in the store.
Matches(matches []*labels.Matcher) bool
}

// ProxyStore implements the store API that proxies request to all given underlying stores.
Expand Down Expand Up @@ -590,6 +593,11 @@ func storeMatches(ctx context.Context, s Client, mint, maxt int64, matchers ...*
if !LabelSetsMatch(matchers, extLset...) {
return false, fmt.Sprintf("external labels %v does not match request label matchers: %v", extLset, matchers)
}

if !s.Matches(matchers) {
return false, fmt.Sprintf("store does not match filter for matchers: %v", matchers)
}

return true, ""
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,30 @@ func TestStoreMatches(t *testing.T) {
maxt: 1,
expectedMatch: true,
},
{
s: &storetestutil.TestClient{ExtLset: []labels.Labels{labels.FromStrings("a", "b")}},
ms: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "b"),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric_name"),
},
maxt: 1,
expectedMatch: true,
},
{
s: &storetestutil.TestClient{
ExtLset: []labels.Labels{
labels.FromStrings("a", "b"),
},
StoreFilterNotMatches: true,
},
ms: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "b"),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric_name"),
},
maxt: 1,
expectedMatch: false,
expectedReason: "store does not match filter for matchers: [a=\"b\" __name__=\"test_metric_name\"]",
},
} {
t.Run("", func(t *testing.T) {
ok, reason := storeMatches(context.TODO(), c.s, c.mint, c.maxt, c.ms...)
Expand Down
16 changes: 9 additions & 7 deletions pkg/store/storepb/testutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type TestClient struct {
WithoutReplicaLabelsEnabled bool
IsLocalStore bool
StoreTSDBInfos []infopb.TSDBInfo
StoreFilterNotMatches bool
}

func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset }
func (c TestClient) TimeRange() (mint, maxt int64) { return c.MinTime, c.MaxTime }
func (c TestClient) TSDBInfos() []infopb.TSDBInfo { return c.StoreTSDBInfos }
func (c TestClient) SupportsSharding() bool { return c.Shardable }
func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled }
func (c TestClient) String() string { return c.Name }
func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore }
func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset }
func (c TestClient) TimeRange() (mint, maxt int64) { return c.MinTime, c.MaxTime }
func (c TestClient) TSDBInfos() []infopb.TSDBInfo { return c.StoreTSDBInfos }
func (c TestClient) SupportsSharding() bool { return c.Shardable }
func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled }
func (c TestClient) String() string { return c.Name }
func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore }
func (c TestClient) Matches(matches []*labels.Matcher) bool { return !c.StoreFilterNotMatches }
Loading
Loading