Skip to content

Commit

Permalink
ring/ring.go: update metrics on ring changes and not periodically
Browse files Browse the repository at this point in the history
When writing e2e tests for services using Ring, it is natural to wait
for having all tokens assigned. With the default 10s interval for updating
metrics, this slows down tests considerably. This was not the case before
PR 50 where metrics were collected on request.

This commit moves updateMetrics into updateRingState, but also optimizes
away updates to ring token ownership metrics when only state or timestamps
have changed and skips the metrics entirely when nothing changed.
The reasoning is that changes to the ring tokens are infrequent so we
can afford a relatively heavy operation.

Signed-off-by: György Krajcsovits <[email protected]>
  • Loading branch information
krajorama committed Dec 29, 2021
1 parent 92f34b5 commit 09cfb8f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
* [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95
* [ENHANCEMENT] Trigger metrics update on ring changes instead of doing it periodically to speed up tests that wait for certain metrics. #107
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
46 changes: 21 additions & 25 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,21 +285,9 @@ func (r *Ring) starting(ctx context.Context) error {

func (r *Ring) loop(ctx context.Context) error {
// Update the ring metrics at start of the main loop.
r.updateRingMetrics()
go func() {
// Start metrics update ticker to update the ring metrics.
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
r.updateRingMetrics()
case <-ctx.Done():
return
}
}
}()
r.mtx.Lock()
r.updateRingMetrics(Different)
r.mtx.Unlock()

r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool {
if value == nil {
Expand Down Expand Up @@ -334,6 +322,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
// when watching the ring for updates).
r.mtx.Lock()
r.ringDesc = ringDesc
r.updateRingMetrics(rc)
r.mtx.Unlock()
return
}
Expand All @@ -356,6 +345,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
// Invalidate all cached subrings.
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
}
r.updateRingMetrics(rc)
}

// Get returns n (or more) instances which form the replicas for the given key.
Expand Down Expand Up @@ -564,21 +554,16 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
return numTokens, owned
}

// updateRingMetrics updates ring metrics.
func (r *Ring) updateRingMetrics() {
r.mtx.RLock()
defer r.mtx.RUnlock()

numTokens, ownedRange := r.countTokens()
for id, totalOwned := range ownedRange {
r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id]))
// updateRingMetrics updates ring metrics. Caller must be holding at least a Read lock!
func (r *Ring) updateRingMetrics(compareResult CompareResult) {
if compareResult == Equal {
return
}

numByState := map[string]int{}
oldestTimestampByState := map[string]int64{}

// Initialised to zero so we emit zero-metrics (instead of not emitting anything)
// Initialized to zero so we emit zero-metrics (instead of not emitting anything)
for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String()} {
numByState[s] = 0
oldestTimestampByState[s] = 0
Expand All @@ -601,6 +586,17 @@ func (r *Ring) updateRingMetrics() {
for state, timestamp := range oldestTimestampByState {
r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp))
}

if compareResult == EqualButStatesAndTimestamps {
return
}

numTokens, ownedRange := r.countTokens()
for id, totalOwned := range ownedRange {
r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id]))
}

r.totalTokensGauge.Set(float64(len(r.ringTokens)))
}

Expand Down

0 comments on commit 09cfb8f

Please sign in to comment.