Skip to content

Commit

Permalink
ring/ring.go: reactive update of metrics
Browse files Browse the repository at this point in the history
When writing e2e tests for services using Ring, it is natural to wait
for having all tokens assigned. With the default 10s interval for updating
metrics, this slows down tests considerably. This was not the case before
PR 50 where metrics were collected on request.

This commit allows the update of metrics when there is a change in the
ring, i.e. when KV.Watch() is triggered. To not hold up watching and also
skip through high frequency changes, the metrics update is decoupled from
 watch via a channel.

A further optimization is to not hold the lock while calculating the metrics,
just copy the Ring.ringDesc pointer which points to a constant state.

Signed-off-by: György Krajcsovits <[email protected]>
  • Loading branch information
krajorama committed Dec 28, 2021
1 parent 92f34b5 commit df7e12b
Showing 1 changed file with 56 additions and 10 deletions.
66 changes: 56 additions & 10 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,17 @@ func (r *Ring) starting(ctx context.Context) error {
func (r *Ring) loop(ctx context.Context) error {
// Update the ring metrics at start of the main loop.
r.updateRingMetrics()

metricChan := make(chan struct{}, 1)
go func() {
// Start metrics update ticker to update the ring metrics.
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-metricChan:
r.updateRingMetrics()
case <-ticker.C:
r.updateRingMetrics()
case <-ctx.Done():
Expand All @@ -307,13 +311,20 @@ func (r *Ring) loop(ctx context.Context) error {
return true
}

r.updateRingState(value.(*Desc))
rc := r.updateRingState(value.(*Desc))
if rc != Equal {
select {
case metricChan <- struct{}{}:
default:
}
}

return true
})
return nil
}

func (r *Ring) updateRingState(ringDesc *Desc) {
func (r *Ring) updateRingState(ringDesc *Desc) CompareResult {
r.mtx.RLock()
prevRing := r.ringDesc
r.mtx.RUnlock()
Expand All @@ -335,7 +346,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.mtx.Lock()
r.ringDesc = ringDesc
r.mtx.Unlock()
return
return rc
}

now := time.Now()
Expand All @@ -356,6 +367,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
// Invalidate all cached subrings.
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
}
return rc
}

// Get returns n (or more) instances which form the replicas for the given key.
Expand Down Expand Up @@ -564,12 +576,46 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
return numTokens, owned
}

// updateRingMetrics updates ring metrics.
// countTokens returns the number of tokens and tokens within the range for each instance.
func countTokensUtil(ringDesc *Desc) (map[string]uint32, map[string]uint32, int) {
ringTokens := ringDesc.GetTokens()
ringInstanceByToken := ringDesc.getTokensInfo()
owned := map[string]uint32{}
numTokens := map[string]uint32{}
for i, token := range ringTokens {
var diff uint32

// Compute how many tokens are within the range.
if i+1 == len(ringTokens) {
diff = (math.MaxUint32 - token) + ringTokens[0]
} else {
diff = ringTokens[i+1] - token
}

info := ringInstanceByToken[token]
numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1
owned[info.InstanceID] = owned[info.InstanceID] + diff
}

// Set to 0 the number of owned tokens by instances which don't have tokens yet.
for id := range ringDesc.Ingesters {
if _, ok := owned[id]; !ok {
owned[id] = 0
numTokens[id] = 0
}
}

return numTokens, owned, len(ringTokens)
}

func (r *Ring) updateRingMetrics() {
r.mtx.RLock()
defer r.mtx.RUnlock()
r.mtx.Lock()
// There is no copy here as the ringDesc object is not modified over time,
// just replaced.
ringDesc := r.ringDesc
r.mtx.Unlock()

numTokens, ownedRange := r.countTokens()
numTokens, ownedRange, ringTokenCount := countTokensUtil(ringDesc)
for id, totalOwned := range ownedRange {
r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id]))
Expand All @@ -578,13 +624,13 @@ func (r *Ring) updateRingMetrics() {
numByState := map[string]int{}
oldestTimestampByState := map[string]int64{}

// Initialised to zero so we emit zero-metrics (instead of not emitting anything)
// Initialized to zero so we emit zero-metrics (instead of not emitting anything)
for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String()} {
numByState[s] = 0
oldestTimestampByState[s] = 0
}

for _, instance := range r.ringDesc.Ingesters {
for _, instance := range ringDesc.Ingesters {
s := instance.State.String()
if !r.IsHealthy(&instance, Reporting, time.Now()) {
s = unhealthy
Expand All @@ -601,7 +647,7 @@ func (r *Ring) updateRingMetrics() {
for state, timestamp := range oldestTimestampByState {
r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp))
}
r.totalTokensGauge.Set(float64(len(r.ringTokens)))
r.totalTokensGauge.Set(float64(ringTokenCount))
}

// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
Expand Down

0 comments on commit df7e12b

Please sign in to comment.