Skip to content

Commit

Permalink
address PR comments, use options in tsbd initializations
Browse files Browse the repository at this point in the history
Signed-off-by: Mindaugas Niaura <[email protected]>
  • Loading branch information
niaurys committed Oct 1, 2024
1 parent 79a803c commit d4a2053
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 54 deletions.
6 changes: 3 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ func runReceive(

level.Info(logger).Log("mode", receiveMode, "msg", "running receive")

var metricNameFilterEnabled bool
multiTSDBOptions := []receive.MultiTSDBOption{}
for _, feature := range *conf.featureList {
if feature == metricNamesFilter {
metricNameFilterEnabled = true
multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled())
level.Info(logger).Log("msg", "metric name filter feature enabled")
}
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func runReceive(
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
metricNameFilterEnabled,
multiTSDBOptions...,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{
Intern: conf.writerInterning,
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func runRule(
}
infoOptions := []info.ServerOptionFunc{info.WithRulesInfoFunc()}
if tsdbDB != nil {
tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset, false)
tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset)
infoOptions = append(
infoOptions,
info.WithLabelSetFunc(func() []*labelpb.LabelSet {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore {
c := &storetestutil.TestClient{
Name: "1",
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels(), false)),
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels())),
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
}

Expand Down
1 change: 0 additions & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,6 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m, &WriterOptions{})
Expand Down
27 changes: 23 additions & 4 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ type MultiTSDB struct {
metricNameFilterEnabled bool
}

// MultiTSDBOption is a functional option for MultiTSDB.
type MultiTSDBOption func(mt *MultiTSDB)

// WithMetricNameFilterEnabled enables metric name filtering on TSDB clients.
func WithMetricNameFilterEnabled() MultiTSDBOption {
return func(s *MultiTSDB) {
s.metricNameFilterEnabled = true
}
}

// NewMultiTSDB creates new MultiTSDB.
// NOTE: Passed labels must be sorted lexicographically (alphabetically).
func NewMultiTSDB(
Expand All @@ -86,28 +96,33 @@ func NewMultiTSDB(
bucket objstore.Bucket,
allowOutOfOrderUpload bool,
hashFunc metadata.HashFunc,
metricNameFilterEnabled bool,
options ...MultiTSDBOption,
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
}

return &MultiTSDB{
mt := &MultiTSDB{
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
metricNameFilterEnabled: metricNameFilterEnabled,
tsdbClientsNeedUpdate: true,
exemplarClientsNeedUpdate: true,
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
}

for _, option := range options {
option(mt)
}

return mt
}

type localClient struct {
Expand Down Expand Up @@ -762,7 +777,11 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
shipper.DefaultMetaFilename,
)
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, t.metricNameFilterEnabled), s, ship, exemplars.NewTSDB(s, lset))
options := []store.TSDBStoreOption{}
if t.metricNameFilterEnabled {
options = append(options, store.WithCuckooMetricNameStoreFilter())
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -142,7 +141,6 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -186,7 +184,6 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -454,7 +451,6 @@ func TestMultiTSDBPrune(t *testing.T) {
test.bucket,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -528,7 +524,6 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
objstore.NewInMemBucket(),
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -590,7 +585,6 @@ func TestAlignedHeadFlush(t *testing.T) {
test.bucket,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -665,7 +659,6 @@ func TestMultiTSDBStats(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -695,7 +688,6 @@ func TestMultiTSDBWithNilStore(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -737,7 +729,6 @@ func TestProxyLabelValues(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -828,7 +819,6 @@ func BenchmarkMultiTSDB(b *testing.B) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(b, m.Close()) }()

Expand Down
1 change: 0 additions & 1 deletion pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,6 @@ func initializeMultiTSDB(dir string) *MultiTSDB {
bucket,
false,
metadata.NoneFunc,
false,
)

return m
Expand Down
2 changes: 0 additions & 2 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ func TestWriter(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
t.Cleanup(func() { testutil.Ok(t, m.Close()) })

Expand Down Expand Up @@ -437,7 +436,6 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
nil,
false,
metadata.NoneFunc,
false,
)
b.Cleanup(func() { testutil.Ok(b, m.Close()) })

Expand Down
4 changes: 2 additions & 2 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func TestTSDBStore_Acceptance(t *testing.T) {
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))

return NewTSDBStore(nil, db, component.Rule, extLset, false)
return NewTSDBStore(nil, db, component.Rule, extLset)
}

testStoreAPIsAcceptance(t, startStore)
Expand Down Expand Up @@ -1173,7 +1173,7 @@ func TestProxyStoreWithReplicas_Acceptance(t *testing.T) {
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))

return NewTSDBStore(nil, db, component.Rule, extLset, false)
return NewTSDBStore(nil, db, component.Rule, extLset)

}

Expand Down
66 changes: 41 additions & 25 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,28 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
)

const RemoteReadFrameLimit = 1048576
const (
RemoteReadFrameLimit = 1048576
cuckooStoreFilterCapacity = 1000000
storeFilterUpdateInterval = 15 * time.Second
)

type TSDBReader interface {
storage.ChunkQueryable
StartTime() (int64, error)
}

// TSDBStoreOption is a functional option for TSDBStore.
type TSDBStoreOption func(s *TSDBStore)

// WithCuckooMetricNameStoreFilter returns a TSDBStoreOption that enables the Cuckoo filter for metric names.
func WithCuckooMetricNameStoreFilter() TSDBStoreOption {
return func(s *TSDBStore) {
s.storeFilter = filter.NewCuckooMetricNameStoreFilter(cuckooStoreFilterCapacity)
s.startStoreFilterUpdate = true
}
}

// TSDBStore implements the store API against a local TSDB instance.
// It attaches the provided external labels to all results. It only responds with raw data
// and does not support downsampling.
Expand All @@ -48,10 +63,11 @@ type TSDBStore struct {
buffers sync.Pool
maxBytesPerFrame int

extLset labels.Labels
storeFilter filter.StoreFilter
mtx sync.RWMutex
close func()
extLset labels.Labels
startStoreFilterUpdate bool
storeFilter filter.StoreFilter
mtx sync.RWMutex
close func()
storepb.UnimplementedStoreServer
}

Expand All @@ -73,43 +89,41 @@ type ReadWriteTSDBStore struct {

// NewTSDBStore creates a new TSDBStore.
// NOTE: Given lset has to be sorted.
func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels, metricNameFilterEnabled bool) *TSDBStore {
func NewTSDBStore(
logger log.Logger,
db TSDBReader,
component component.StoreAPI,
extLset labels.Labels,
options ...TSDBStoreOption,
) *TSDBStore {
if logger == nil {
logger = log.NewNopLogger()
}

var (
storeFilter filter.StoreFilter
startFilterUpdate bool
)

storeFilter = filter.AllowAllStoreFilter{}
if metricNameFilterEnabled {
startFilterUpdate = true
storeFilter = filter.NewCuckooMetricNameStoreFilter(1000000) // about 1MB on 64bit machines.
}

st := &TSDBStore{
logger: logger,
db: db,
component: component,
extLset: extLset,
storeFilter: storeFilter,
maxBytesPerFrame: RemoteReadFrameLimit,
storeFilter: filter.AllowAllStoreFilter{},
close: func() {},
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
return &b
}},
}

if startFilterUpdate {
t := time.NewTicker(15 * time.Second)
for _, option := range options {
option(st)
}

if st.startStoreFilterUpdate {
ctx, cancel := context.WithCancel(context.Background())
updateFilter := func() {
vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{

updateFilter := func(ctx context.Context) {
vals, err := st.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: model.MetricNameLabel,
Start: 0,
End: math.MaxInt64,
})
if err != nil {
Expand All @@ -120,13 +134,15 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI
st.storeFilter.ResetAndSet(vals.Values...)
}
st.close = cancel
updateFilter()
updateFilter(ctx)

t := time.NewTicker(storeFilterUpdateInterval)

go func() {
for {
select {
case <-t.C:
updateFilter()
updateFilter(ctx)
case <-ctx.Done():
return
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestTSDBStore_Series_ChunkChecksum(t *testing.T) {
defer func() { testutil.Ok(t, db.Close()) }()
testutil.Ok(t, err)

tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"), false)
tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"))

appender := db.Appender(context.Background())

Expand Down Expand Up @@ -251,7 +251,7 @@ func TestTSDBStore_SeriesAccessWithDelegateClosing(t *testing.T) {
})

extLabels := labels.FromStrings("ext", "1")
store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false)
store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels)

srv := storetestutil.NewSeriesServer(context.Background())
csrv := &delegatorServer{SeriesServer: srv}
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestTSDBStore_SeriesAccessWithoutDelegateClosing(t *testing.T) {
})

extLabels := labels.FromStrings("ext", "1")
store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false)
store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels)

srv := storetestutil.NewSeriesServer(context.Background())
t.Run("call series and access results", func(t *testing.T) {
Expand Down Expand Up @@ -555,7 +555,7 @@ func benchTSDBStoreSeries(t testutil.TB, totalSamples, totalSeries int) {
defer func() { testutil.Ok(t, db.Close()) }()

extLabels := labels.FromStrings("ext", "1")
store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false)
store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels)

var expected []*storepb.Series
for _, resp := range resps {
Expand Down

0 comments on commit d4a2053

Please sign in to comment.