diff --git a/CHANGELOG.md b/CHANGELOG.md index 2be12a7f2..1fc33c7a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,5 +26,6 @@ * [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80 * [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97 * [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95 +* [ENHANCEMENT] Trigger metrics update on ring changes instead of doing it periodically to speed up tests that wait for certain metrics. #107 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 diff --git a/ring/ring.go b/ring/ring.go index b9a11b83a..9632cf174 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -285,21 +285,9 @@ func (r *Ring) starting(ctx context.Context) error { func (r *Ring) loop(ctx context.Context) error { // Update the ring metrics at start of the main loop. - r.updateRingMetrics() - go func() { - // Start metrics update ticker to update the ring metrics. - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - r.updateRingMetrics() - case <-ctx.Done(): - return - } - } - }() + r.mtx.Lock() + r.updateRingMetrics(Different) + r.mtx.Unlock() r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { if value == nil { @@ -334,6 +322,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { // when watching the ring for updates). r.mtx.Lock() r.ringDesc = ringDesc + r.updateRingMetrics(rc) r.mtx.Unlock() return } @@ -356,6 +345,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { // Invalidate all cached subrings. r.shuffledSubringCache = make(map[subringCacheKey]*Ring) } + r.updateRingMetrics(rc) } // Get returns n (or more) instances which form the replicas for the given key. @@ -564,21 +554,16 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { return numTokens, owned } -// updateRingMetrics updates ring metrics. -func (r *Ring) updateRingMetrics() { - r.mtx.RLock() - defer r.mtx.RUnlock() - - numTokens, ownedRange := r.countTokens() - for id, totalOwned := range ownedRange { - r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) - r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) +// updateRingMetrics updates ring metrics. Caller must be holding at least a Read lock! +func (r *Ring) updateRingMetrics(compareResult CompareResult) { + if compareResult == Equal { + return } numByState := map[string]int{} oldestTimestampByState := map[string]int64{} - // Initialised to zero so we emit zero-metrics (instead of not emitting anything) + // Initialized to zero so we emit zero-metrics (instead of not emitting anything) for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String()} { numByState[s] = 0 oldestTimestampByState[s] = 0 @@ -601,6 +586,17 @@ func (r *Ring) updateRingMetrics() { for state, timestamp := range oldestTimestampByState { r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp)) } + + if compareResult == EqualButStatesAndTimestamps { + return + } + + numTokens, ownedRange := r.countTokens() + for id, totalOwned := range ownedRange { + r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) + r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) + } + r.totalTokensGauge.Set(float64(len(r.ringTokens))) } diff --git a/ring/ring_test.go b/ring/ring_test.go index 82193753c..35a91c691 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -81,6 +82,72 @@ func generateKeys(r *rand.Rand, numTokens int, dest []uint32) { } } +func BenchmarkUpdateRingState(b *testing.B) { + for _, numInstances := range []int{50, 100, 500} { + for _, numZones := range []int{1, 3} { + for _, numTokens := range []int{128, 256, 512} { + for _, updateTokens := range []bool{false, true} { + b.Run(fmt.Sprintf("num instances = %d, num zones = %d, num tokens = %d, update tokens = %t", numInstances, numZones, numTokens, updateTokens), func(b *testing.B) { + benchmarkUpdateRingState(b, numInstances, numZones, numTokens, updateTokens) + }) + } + } + } + } +} + +func benchmarkUpdateRingState(b *testing.B, numInstances, numZones, numTokens int, updateTokens bool) { + cfg := Config{ + KVStore: kv.Config{}, + HeartbeatTimeout: 0, // get healthy stats + ReplicationFactor: 3, + ZoneAwarenessEnabled: true, + } + + // create the ring to set up metrics, but do not start + registry := prometheus.NewRegistry() + ring, err := NewWithStoreClientAndStrategy(cfg, testRingName, testRingKey, nil, NewDefaultReplicationStrategy(), registry, log.NewNopLogger()) + require.NoError(b, err) + + // Make a random ring with N instances, and M tokens per ingests + // Also make a copy with different timestamps and one with different tokens + desc := NewDesc() + otherDesc := NewDesc() + takenTokens := []uint32{} + otherTakenTokens := []uint32{} + for i := 0; i < numInstances; i++ { + tokens := GenerateTokens(numTokens, takenTokens) + takenTokens = append(takenTokens, tokens...) + now := time.Now() + id := fmt.Sprintf("%d", i) + desc.AddIngester(id, fmt.Sprintf("instance-%d", i), strconv.Itoa(i), tokens, ACTIVE, now) + if updateTokens { + otherTokens := GenerateTokens(numTokens, otherTakenTokens) + otherTakenTokens = append(otherTakenTokens, otherTokens...) + otherDesc.AddIngester(id, fmt.Sprintf("instance-%d", i), strconv.Itoa(i), otherTokens, ACTIVE, now) + } else { + otherDesc.AddIngester(id, fmt.Sprintf("instance-%d", i), strconv.Itoa(i), tokens, JOINING, now) + } + } + + if updateTokens { + require.Equal(b, Different, desc.RingCompare(otherDesc)) + } else { + require.Equal(b, EqualButStatesAndTimestamps, desc.RingCompare(otherDesc)) + } + + flipFlop := true + b.ResetTimer() + for n := 0; n < b.N; n++ { + if flipFlop { + ring.updateRingState(desc) + } else { + ring.updateRingState(otherDesc) + } + flipFlop = !flipFlop + } +} + func TestDoBatchZeroInstances(t *testing.T) { ctx := context.Background() numKeys := 10