diff --git a/ring/ring.go b/ring/ring.go index b9a11b83a..77164be2a 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -286,6 +286,8 @@ func (r *Ring) starting(ctx context.Context) error { func (r *Ring) loop(ctx context.Context) error { // Update the ring metrics at start of the main loop. r.updateRingMetrics() + + metricChan := make(chan struct{}, 1) go func() { // Start metrics update ticker to update the ring metrics. ticker := time.NewTicker(10 * time.Second) @@ -293,6 +295,8 @@ func (r *Ring) loop(ctx context.Context) error { for { select { + case <-metricChan: + r.updateRingMetrics() case <-ticker.C: r.updateRingMetrics() case <-ctx.Done(): @@ -307,13 +311,20 @@ func (r *Ring) loop(ctx context.Context) error { return true } - r.updateRingState(value.(*Desc)) + rc := r.updateRingState(value.(*Desc)) + if rc != Equal { + select { + case metricChan <- struct{}{}: + default: + } + } + return true }) return nil } -func (r *Ring) updateRingState(ringDesc *Desc) { +func (r *Ring) updateRingState(ringDesc *Desc) CompareResult { r.mtx.RLock() prevRing := r.ringDesc r.mtx.RUnlock() @@ -335,7 +346,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.mtx.Lock() r.ringDesc = ringDesc r.mtx.Unlock() - return + return rc } now := time.Now() @@ -356,6 +367,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { // Invalidate all cached subrings. r.shuffledSubringCache = make(map[subringCacheKey]*Ring) } + return rc } // Get returns n (or more) instances which form the replicas for the given key. @@ -564,12 +576,46 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { return numTokens, owned } -// updateRingMetrics updates ring metrics. +// countTokens returns the number of tokens and tokens within the range for each instance. +func countTokensUtil(ringDesc *Desc) (map[string]uint32, map[string]uint32, int) { + ringTokens := ringDesc.GetTokens() + ringInstanceByToken := ringDesc.getTokensInfo() + owned := map[string]uint32{} + numTokens := map[string]uint32{} + for i, token := range ringTokens { + var diff uint32 + + // Compute how many tokens are within the range. + if i+1 == len(ringTokens) { + diff = (math.MaxUint32 - token) + ringTokens[0] + } else { + diff = ringTokens[i+1] - token + } + + info := ringInstanceByToken[token] + numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 + owned[info.InstanceID] = owned[info.InstanceID] + diff + } + + // Set to 0 the number of owned tokens by instances which don't have tokens yet. + for id := range ringDesc.Ingesters { + if _, ok := owned[id]; !ok { + owned[id] = 0 + numTokens[id] = 0 + } + } + + return numTokens, owned, len(ringTokens) +} + func (r *Ring) updateRingMetrics() { - r.mtx.RLock() - defer r.mtx.RUnlock() + r.mtx.Lock() + // There is no copy here as the ringDesc object is not modified over time, + // just replaced. + ringDesc := r.ringDesc + r.mtx.Unlock() - numTokens, ownedRange := r.countTokens() + numTokens, ownedRange, ringTokenCount := countTokensUtil(ringDesc) for id, totalOwned := range ownedRange { r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) @@ -578,13 +624,13 @@ func (r *Ring) updateRingMetrics() { numByState := map[string]int{} oldestTimestampByState := map[string]int64{} - // Initialised to zero so we emit zero-metrics (instead of not emitting anything) + // Initialized to zero so we emit zero-metrics (instead of not emitting anything) for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String()} { numByState[s] = 0 oldestTimestampByState[s] = 0 } - for _, instance := range r.ringDesc.Ingesters { + for _, instance := range ringDesc.Ingesters { s := instance.State.String() if !r.IsHealthy(&instance, Reporting, time.Now()) { s = unhealthy @@ -601,7 +647,7 @@ func (r *Ring) updateRingMetrics() { for state, timestamp := range oldestTimestampByState { r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp)) } - r.totalTokensGauge.Set(float64(len(r.ringTokens))) + r.totalTokensGauge.Set(float64(ringTokenCount)) } // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)