Skip to content

Commit

Permalink
tenant heads use sync.Map
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d committed May 5, 2022
1 parent b1a3e15 commit 64adebc
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 74 deletions.
119 changes: 48 additions & 71 deletions pkg/storage/stores/tsdb/head_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

"github.com/cespare/xxhash"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
Expand All @@ -31,10 +30,6 @@ type period time.Duration

const defaultRotationPeriod = period(15 * time.Minute)

// Do not specify without bit shifting. This allows us to
// do shard index calcuations via bitwise & rather than modulos.
const defaultHeadManagerStripeSize = 1 << 7

/*
HeadManager both accepts flushed chunk writes
and exposes the index interface for multiple tenants.
Expand Down Expand Up @@ -79,22 +74,19 @@ type HeadManager struct {
tsdbManager TSDBManager
active, prev *headWAL

shards int
activeHeads, prevHeads *tenantHeads

Index
}

func NewHeadManager(logger log.Logger, dir string, metrics *Metrics, tsdbManager TSDBManager) *HeadManager {
shards := defaultHeadManagerStripeSize
m := &HeadManager{
log: log.With(logger, "component", "tsdb-head-manager"),
dir: dir,
metrics: metrics,
tsdbManager: tsdbManager,

period: defaultRotationPeriod,
shards: shards,
}

m.Index = LazyIndex(func() (Index, error) {
Expand Down Expand Up @@ -185,7 +177,7 @@ func (m *HeadManager) Start() error {
return err
}

m.activeHeads = newTenantHeads(now, m.shards, m.metrics, m.log)
m.activeHeads = newTenantHeads(now, m.metrics, m.log)

for _, group := range walsByPeriod {
if group.period < (curPeriod) {
Expand Down Expand Up @@ -251,7 +243,7 @@ func (m *HeadManager) Rotate(t time.Time) error {
}

// create new tenant heads
nextHeads := newTenantHeads(t, m.shards, m.metrics, m.log)
nextHeads := newTenantHeads(t, m.metrics, m.log)

stopPrev := func(s string) {
if m.prev != nil {
Expand Down Expand Up @@ -485,31 +477,36 @@ type tenantHeads struct {
mint, maxt atomic.Int64 // easy lookup for Bounds() impl

start time.Time
shards int
locks []sync.RWMutex
tenants []map[string]*Head
tenants sync.Map
log log.Logger
chunkFilter chunk.RequestChunkFilterer
metrics *Metrics
}

func newTenantHeads(start time.Time, shards int, metrics *Metrics, logger log.Logger) *tenantHeads {
func newTenantHeads(start time.Time, metrics *Metrics, logger log.Logger) *tenantHeads {
res := &tenantHeads{
start: start,
shards: shards,
locks: make([]sync.RWMutex, shards),
tenants: make([]map[string]*Head, shards),
log: log.With(logger, "component", "tenant-heads"),
metrics: metrics,
}
for i := range res.tenants {
res.tenants[i] = make(map[string]*Head)
}

return res
}

func (t *tenantHeads) getOrCreateTenant(userID string) *Head {
x, ok := t.tenants.Load(userID)
if ok {
return x.(*Head)
}

head := NewHead(userID, t.metrics, t.log)
// User LoadOrStore in case another goroutine has written
// the same key
res, _ := t.tenants.LoadOrStore(userID, head)
return res.(*Head)
}

func (t *tenantHeads) Append(userID string, ls labels.Labels, chks index.ChunkMetas) *WALRecord {
idx := t.shardForTenant(userID)

var mint, maxt int64
for _, chk := range chks {
Expand All @@ -523,25 +520,8 @@ func (t *tenantHeads) Append(userID string, ls labels.Labels, chks index.ChunkMe
}
updateMintMaxt(mint, maxt, &t.mint, &t.maxt)

// First, check if this tenant has been created
var (
mtx = &t.locks[idx]
newStream bool
refID uint64
)
mtx.RLock()
if head, ok := t.tenants[idx][userID]; ok {
newStream, refID = head.Append(ls, chks)
mtx.RUnlock()
} else {
// tenant does not exist, so acquire write lock to insert it
mtx.RUnlock()
mtx.Lock()
head := NewHead(userID, t.metrics, t.log)
t.tenants[idx][userID] = head
newStream, refID = head.Append(ls, chks)
mtx.Unlock()
}
head := t.getOrCreateTenant(userID)
newStream, refID := head.Append(ls, chks)

rec := &WALRecord{
UserID: userID,
Expand All @@ -561,10 +541,6 @@ func (t *tenantHeads) Append(userID string, ls labels.Labels, chks index.ChunkMe
return rec
}

func (t *tenantHeads) shardForTenant(userID string) uint64 {
return xxhash.Sum64String(userID) & uint64(t.shards-1)
}

func (t *tenantHeads) Close() error { return nil }

func (t *tenantHeads) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
Expand All @@ -576,15 +552,12 @@ func (t *tenantHeads) Bounds() (model.Time, model.Time) {
}

func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx Index, ok bool) {
i := t.shardForTenant(userID)
t.locks[i].RLock()
defer t.locks[i].RUnlock()
tenant, ok := t.tenants[i][userID]
res, ok := t.tenants.Load(userID)
if !ok {
return
return nil, false
}

idx = NewTSDBIndex(tenant.indexRange(int64(from), int64(through)))
idx = NewTSDBIndex(res.(*Head).indexRange(int64(from), int64(through)))
if t.chunkFilter != nil {
idx.SetChunkFilterer(t.chunkFilter)
}
Expand Down Expand Up @@ -631,33 +604,37 @@ func (t *tenantHeads) LabelValues(ctx context.Context, userID string, from, thro

// helper only used in building TSDBs
func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, chks index.ChunkMetas)) error {
for i, shard := range t.tenants {
t.locks[i].RLock()
defer t.locks[i].RUnlock()

for user, tenant := range shard {
idx := tenant.Index()
ps, err := postingsForMatcher(idx, nil, labels.MustNewMatcher(labels.MatchEqual, "", ""))
if err != nil {
return err
}
var firstErr error

for ps.Next() {
var (
ls labels.Labels
chks []index.ChunkMeta
)
t.tenants.Range(func(key, value interface{}) bool {
user := key.(string)
tenant := value.(*Head)

_, err := idx.Series(ps.At(), &ls, &chks)
idx := tenant.Index()
ps, err := postingsForMatcher(idx, nil, labels.MustNewMatcher(labels.MatchEqual, "", ""))
if err != nil {
firstErr = err
return false
}

if err != nil {
return errors.Wrapf(err, "iterating postings for tenant: %s", user)
}
for ps.Next() {
var (
ls labels.Labels
chks []index.ChunkMeta
)

fn(user, ls, chks)
_, err := idx.Series(ps.At(), &ls, &chks)

if err != nil {
firstErr = errors.Wrapf(err, "iterating postings for tenant: %s", user)
return false
}

fn(user, ls, chks)
}
}
return true
})

return nil
return firstErr
}
4 changes: 2 additions & 2 deletions pkg/storage/stores/tsdb/head_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func chunkMetasToChunkRefs(user string, fp uint64, xs index.ChunkMetas) (res []C

// Test append
func Test_TenantHeads_Append(t *testing.T) {
h := newTenantHeads(time.Now(), defaultHeadManagerStripeSize, NewMetrics(nil), log.NewNopLogger())
h := newTenantHeads(time.Now(), NewMetrics(nil), log.NewNopLogger())
ls := mustParseLabels(`{foo="bar"}`)
chks := []index.ChunkMeta{
{
Expand Down Expand Up @@ -64,7 +64,7 @@ func Test_TenantHeads_Append(t *testing.T) {

// Test multitenant reads
func Test_TenantHeads_MultiRead(t *testing.T) {
h := newTenantHeads(time.Now(), defaultHeadManagerStripeSize, NewMetrics(nil), log.NewNopLogger())
h := newTenantHeads(time.Now(), NewMetrics(nil), log.NewNopLogger())
ls := mustParseLabels(`{foo="bar"}`)
chks := []index.ChunkMeta{
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/tsdb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error
}()

level.Debug(m.log).Log("msg", "recovering tenant heads")
tmp := newTenantHeads(t, defaultHeadManagerStripeSize, m.metrics, m.log)
tmp := newTenantHeads(t, m.metrics, m.log)
if err = recoverHead(m.dir, tmp, ids); err != nil {
return errors.Wrap(err, "building TSDB from WALs")
}
Expand Down

0 comments on commit 64adebc

Please sign in to comment.