Skip to content

Commit

Permalink
[Receive] Fix race condition when adding multiple new tenants at once (
Browse files Browse the repository at this point in the history
…#7941)

* [Receive] fix race condition

Signed-off-by: Yi Jin <[email protected]>

* add a change log

Signed-off-by: Yi Jin <[email protected]>

* memorize tsdb local clients without race condition

Signed-off-by: Yi Jin <[email protected]>

* fix data race in testing with some concurrent safe helper functions

Signed-off-by: Yi Jin <[email protected]>

* address comments

Signed-off-by: Yi Jin <[email protected]>

---------

Signed-off-by: Yi Jin <[email protected]>
Signed-off-by: Saswata Mukherjee <[email protected]>
  • Loading branch information
jnyi authored and saswatamcode committed Dec 3, 2024
1 parent 0b25e34 commit 58facfd
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0.
- [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers.
- [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer
- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892).

### Added

Expand Down
2 changes: 1 addition & 1 deletion docs/sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt

# Relabelling

Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.
Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.

Currently, thanos only supports the following relabel actions:

Expand Down
140 changes: 66 additions & 74 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,8 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

tsdbClients []store.Client
tsdbClientsNeedUpdate bool

exemplarClients map[string]*exemplars.TSDB
exemplarClientsNeedUpdate bool
tsdbClients []store.Client
exemplarClients map[string]*exemplars.TSDB

metricNameFilterEnabled bool
}
Expand Down Expand Up @@ -100,19 +97,19 @@ func NewMultiTSDB(
}

mt := &MultiTSDB{
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tsdbClientsNeedUpdate: true,
exemplarClientsNeedUpdate: true,
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tsdbClients: make([]store.Client, 0),
exemplarClients: map[string]*exemplars.TSDB{},
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
}

for _, option := range options {
Expand All @@ -122,6 +119,49 @@ func NewMultiTSDB(
return mt
}

// testGetTenant returns the tenant with the given tenantID for testing purposes.
func (t *MultiTSDB) testGetTenant(tenantID string) *tenant {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.tenants[tenantID]
}

func (t *MultiTSDB) updateTSDBClients() {
t.tsdbClients = t.tsdbClients[:0]
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
t.tsdbClients = append(t.tsdbClients, client)
}
}
}

func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) {
t.tenants[tenantID] = newTenant
t.updateTSDBClients()
if newTenant.exemplars() != nil {
t.exemplarClients[tenantID] = newTenant.exemplars()
}
}

func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.addTenantUnlocked(tenantID, newTenant)
}

func (t *MultiTSDB) removeTenantUnlocked(tenantID string) {
delete(t.tenants, tenantID)
delete(t.exemplarClients, tenantID)
t.updateTSDBClients()
}

func (t *MultiTSDB) removeTenantLocked(tenantID string) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.removeTenantUnlocked(tenantID)
}

type localClient struct {
store *store.TSDBStore

Expand Down Expand Up @@ -416,9 +456,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {
}

level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.removeTenantUnlocked(tenantID)
}

return merr.Err()
Expand Down Expand Up @@ -578,58 +616,17 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error {
return merr.Err()
}

// TSDBLocalClients should be used as read-only.
func (t *MultiTSDB) TSDBLocalClients() []store.Client {
t.mtx.RLock()
if !t.tsdbClientsNeedUpdate {
t.mtx.RUnlock()
return t.tsdbClients
}

t.mtx.RUnlock()
t.mtx.Lock()
defer t.mtx.Unlock()
if !t.tsdbClientsNeedUpdate {
return t.tsdbClients
}

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
client := tenant.client()
if client != nil {
res = append(res, client)
}
}

t.tsdbClientsNeedUpdate = false
t.tsdbClients = res

defer t.mtx.RUnlock()
return t.tsdbClients
}

// TSDBExemplars should be used as read-only.
func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
t.mtx.RLock()
if !t.exemplarClientsNeedUpdate {
t.mtx.RUnlock()
return t.exemplarClients
}
t.mtx.RUnlock()
t.mtx.Lock()
defer t.mtx.Unlock()

if !t.exemplarClientsNeedUpdate {
return t.exemplarClients
}

res := make(map[string]*exemplars.TSDB, len(t.tenants))
for k, tenant := range t.tenants {
e := tenant.exemplars()
if e != nil {
res[k] = e
}
}

t.exemplarClientsNeedUpdate = false
t.exemplarClients = res
defer t.mtx.RUnlock()
return t.exemplarClients
}

Expand Down Expand Up @@ -705,11 +702,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
nil,
)
if err != nil {
t.mtx.Lock()
delete(t.tenants, tenantID)
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.mtx.Unlock()
t.removeTenantLocked(tenantID)
return err
}
var ship *shipper.Shipper
Expand All @@ -732,6 +725,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
options = append(options, store.WithCuckooMetricNameStoreFilter())
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down Expand Up @@ -760,9 +754,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
}

tenant = newTenant()
t.tenants[tenantID] = tenant
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.addTenantUnlocked(tenantID, tenant)
t.mtx.Unlock()

logger := log.With(t.logger, "tenant", tenantID)
Expand Down
49 changes: 47 additions & 2 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receive

import (
"context"
"fmt"
"io"
"math"
"os"
Expand Down Expand Up @@ -193,7 +194,7 @@ func TestMultiTSDB(t *testing.T) {
testutil.Ok(t, m.Open())
testutil.Ok(t, appendSample(m, testTenant, time.Now()))

tenant := m.tenants[testTenant]
tenant := m.testGetTenant(testTenant)
db := tenant.readyStorage().Get()

testutil.Equals(t, 0, len(db.Blocks()))
Expand Down Expand Up @@ -541,6 +542,47 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
testutil.Equals(t, 1, len(m.TSDBLocalClients()))
}

func TestMultiTSDBAddNewTenant(t *testing.T) {
t.Parallel()
const iterations = 10
// This test detects race conditions, so we run it multiple times to increase the chance of catching the issue.
for i := 0; i < iterations; i++ {
t.Run(fmt.Sprintf("iteration-%d", i), func(t *testing.T) {
dir := t.TempDir()
m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(),
&tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
},
labels.FromStrings("replica", "test"),
"tenant_id",
objstore.NewInMemBucket(),
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()

concurrency := 50
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
// simulate remote write with new tenant concurrently
go func(i int) {
defer wg.Done()
testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10))))
}(i)
// simulate read request concurrently
go func() {
m.TSDBLocalClients()
}()
}
wg.Wait()
testutil.Equals(t, concurrency, len(m.TSDBLocalClients()))
})
}
}

func TestAlignedHeadFlush(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -801,7 +843,10 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim

func queryLabelValues(ctx context.Context, m *MultiTSDB) error {
proxy := store.NewProxyStore(nil, nil, func() []store.Client {
clients := m.TSDBLocalClients()
m.mtx.Lock()
defer m.mtx.Unlock()
clients := make([]store.Client, len(m.tsdbClients))
copy(clients, m.tsdbClients)
if len(clients) > 0 {
clients[0] = &slowClient{clients[0]}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) {

for _, c := range tc.cfg {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand All @@ -319,7 +319,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) {

for _, c := range changedConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) {

for _, c := range initialConfig {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) {

for _, c := range cfg {
for _, tenantId := range c.Tenants {
if m.tenants[tenantId] == nil {
if m.testGetTenant(tenantId) == nil {
err = appendSample(m, tenantId, time.Now())
require.NoError(t, err)
}
Expand Down

0 comments on commit 58facfd

Please sign in to comment.