Skip to content

Commit

Permalink
Merge pull request #107 from grafana/enhance/reactive-metrics
Browse files Browse the repository at this point in the history
ring/ring.go: reactive update of metrics
  • Loading branch information
krajorama authored Dec 29, 2021
2 parents 92f34b5 + 0f62f3a commit 87bd95f
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
* [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95
* [ENHANCEMENT] Trigger metrics update on ring changes instead of doing it periodically to speed up tests that wait for certain metrics. #107
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
46 changes: 21 additions & 25 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,21 +285,9 @@ func (r *Ring) starting(ctx context.Context) error {

func (r *Ring) loop(ctx context.Context) error {
// Update the ring metrics at start of the main loop.
r.updateRingMetrics()
go func() {
// Start metrics update ticker to update the ring metrics.
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
r.updateRingMetrics()
case <-ctx.Done():
return
}
}
}()
r.mtx.Lock()
r.updateRingMetrics(Different)
r.mtx.Unlock()

r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool {
if value == nil {
Expand Down Expand Up @@ -334,6 +322,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
// when watching the ring for updates).
r.mtx.Lock()
r.ringDesc = ringDesc
r.updateRingMetrics(rc)
r.mtx.Unlock()
return
}
Expand All @@ -356,6 +345,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
// Invalidate all cached subrings.
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
}
r.updateRingMetrics(rc)
}

// Get returns n (or more) instances which form the replicas for the given key.
Expand Down Expand Up @@ -564,21 +554,16 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
return numTokens, owned
}

// updateRingMetrics updates ring metrics.
func (r *Ring) updateRingMetrics() {
r.mtx.RLock()
defer r.mtx.RUnlock()

numTokens, ownedRange := r.countTokens()
for id, totalOwned := range ownedRange {
r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id]))
// updateRingMetrics updates ring metrics. Caller must be holding at least a Read lock!
func (r *Ring) updateRingMetrics(compareResult CompareResult) {
if compareResult == Equal {
return
}

numByState := map[string]int{}
oldestTimestampByState := map[string]int64{}

// Initialised to zero so we emit zero-metrics (instead of not emitting anything)
// Initialized to zero so we emit zero-metrics (instead of not emitting anything)
for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String()} {
numByState[s] = 0
oldestTimestampByState[s] = 0
Expand All @@ -601,6 +586,17 @@ func (r *Ring) updateRingMetrics() {
for state, timestamp := range oldestTimestampByState {
r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp))
}

if compareResult == EqualButStatesAndTimestamps {
return
}

numTokens, ownedRange := r.countTokens()
for id, totalOwned := range ownedRange {
r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id]))
}

r.totalTokensGauge.Set(float64(len(r.ringTokens)))
}

Expand Down
67 changes: 67 additions & 0 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -81,6 +82,72 @@ func generateKeys(r *rand.Rand, numTokens int, dest []uint32) {
}
}

func BenchmarkUpdateRingState(b *testing.B) {
for _, numInstances := range []int{50, 100, 500} {
for _, numZones := range []int{1, 3} {
for _, numTokens := range []int{128, 256, 512} {
for _, updateTokens := range []bool{false, true} {
b.Run(fmt.Sprintf("num instances = %d, num zones = %d, num tokens = %d, update tokens = %t", numInstances, numZones, numTokens, updateTokens), func(b *testing.B) {
benchmarkUpdateRingState(b, numInstances, numZones, numTokens, updateTokens)
})
}
}
}
}
}

func benchmarkUpdateRingState(b *testing.B, numInstances, numZones, numTokens int, updateTokens bool) {
cfg := Config{
KVStore: kv.Config{},
HeartbeatTimeout: 0, // get healthy stats
ReplicationFactor: 3,
ZoneAwarenessEnabled: true,
}

// create the ring to set up metrics, but do not start
registry := prometheus.NewRegistry()
ring, err := NewWithStoreClientAndStrategy(cfg, testRingName, testRingKey, nil, NewDefaultReplicationStrategy(), registry, log.NewNopLogger())
require.NoError(b, err)

// Make a random ring with N instances, and M tokens per ingests
// Also make a copy with different timestamps and one with different tokens
desc := NewDesc()
otherDesc := NewDesc()
takenTokens := []uint32{}
otherTakenTokens := []uint32{}
for i := 0; i < numInstances; i++ {
tokens := GenerateTokens(numTokens, takenTokens)
takenTokens = append(takenTokens, tokens...)
now := time.Now()
id := fmt.Sprintf("%d", i)
desc.AddIngester(id, fmt.Sprintf("instance-%d", i), strconv.Itoa(i), tokens, ACTIVE, now)
if updateTokens {
otherTokens := GenerateTokens(numTokens, otherTakenTokens)
otherTakenTokens = append(otherTakenTokens, otherTokens...)
otherDesc.AddIngester(id, fmt.Sprintf("instance-%d", i), strconv.Itoa(i), otherTokens, ACTIVE, now)
} else {
otherDesc.AddIngester(id, fmt.Sprintf("instance-%d", i), strconv.Itoa(i), tokens, JOINING, now)
}
}

if updateTokens {
require.Equal(b, Different, desc.RingCompare(otherDesc))
} else {
require.Equal(b, EqualButStatesAndTimestamps, desc.RingCompare(otherDesc))
}

flipFlop := true
b.ResetTimer()
for n := 0; n < b.N; n++ {
if flipFlop {
ring.updateRingState(desc)
} else {
ring.updateRingState(otherDesc)
}
flipFlop = !flipFlop
}
}

func TestDoBatchZeroInstances(t *testing.T) {
ctx := context.Background()
numKeys := 10
Expand Down

0 comments on commit 87bd95f

Please sign in to comment.