Skip to content

Commit

Permalink
Thread saturation metrics
Browse files Browse the repository at this point in the history
Adds metrics suggested in #488, to record the percentage of time the
main and FSM goroutines are busy with work vs available to accept new
work, to give operators an idea of how close they are to hitting
capacity limits.

We keep 256 samples in memory for each metric, and update gauges (at
most) once a second, possibly less if the goroutines are idle. This
should be ok because it's unlikely that a goroutine would go from very
high saturation to being completely idle (so at worst we'll leave the
gauge on the previous low value).
  • Loading branch information
boxofrad committed Apr 27, 2022
1 parent 7fc5a41 commit 8ac5ab2
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 8 deletions.
6 changes: 6 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ type Raft struct {

// followerNotifyCh is used to tell followers that config has changed
followerNotifyCh chan struct{}

// thread saturation metric recorders.
mainThreadSaturation *saturationMetric
fsmThreadSaturation *saturationMetric
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -553,6 +557,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
leaderNotifyCh: make(chan struct{}, 1),
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "mainThreadSaturation"}, 1*time.Second),
fsmThreadSaturation: newSaturationMetric([]string{"raft", "fsm", "threadSaturation"}, 1*time.Second),
}

r.conf.Store(*conf)
Expand Down
6 changes: 6 additions & 0 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,12 @@ func (r *Raft) runFSM() {
}

for {
r.fsmThreadSaturation.sleeping()

select {
case ptr := <-r.fsmMutateCh:
r.fsmThreadSaturation.working()

switch req := ptr.(type) {
case []*commitTuple:
applyBatch(req)
Expand All @@ -238,6 +242,8 @@ func (r *Raft) runFSM() {
}

case req := <-r.fsmSnapshotCh:
r.fsmThreadSaturation.working()

snapshot(req)

case <-r.shutdownCh:
Expand Down
52 changes: 44 additions & 8 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,35 +159,45 @@ func (r *Raft) runFollower() {
heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout)

for r.getState() == Follower {
r.mainThreadSaturation.sleeping()

select {
case rpc := <-r.rpcCh:
r.mainThreadSaturation.working()
r.processRPC(rpc)

case c := <-r.configurationChangeCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
c.respond(ErrNotLeader)

case a := <-r.applyCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
a.respond(ErrNotLeader)

case v := <-r.verifyCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case r := <-r.userRestoreCh:
case ur := <-r.userRestoreCh:
r.mainThreadSaturation.working()
// Reject any restores since we are not the leader
r.respond(ErrNotLeader)
ur.respond(ErrNotLeader)

case r := <-r.leadershipTransferCh:
case l := <-r.leadershipTransferCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
r.respond(ErrNotLeader)
l.respond(ErrNotLeader)

case c := <-r.configurationsCh:
r.mainThreadSaturation.working()
c.configurations = r.configurations.Clone()
c.respond(nil)

case b := <-r.bootstrapCh:
r.mainThreadSaturation.working()
b.respond(r.liveBootstrap(b.configuration))

case <-r.leaderNotifyCh:
Expand All @@ -197,6 +207,7 @@ func (r *Raft) runFollower() {
heartbeatTimer = time.After(0)

case <-heartbeatTimer:
r.mainThreadSaturation.working()
// Restart the heartbeat timer
hbTimeout := r.config().HeartbeatTimeout
heartbeatTimer = randomTimeout(hbTimeout)
Expand Down Expand Up @@ -290,11 +301,15 @@ func (r *Raft) runCandidate() {
r.logger.Debug("votes", "needed", votesNeeded)

for r.getState() == Candidate {
r.mainThreadSaturation.sleeping()

select {
case rpc := <-r.rpcCh:
r.mainThreadSaturation.working()
r.processRPC(rpc)

case vote := <-voteCh:
r.mainThreadSaturation.working()
// Check if the term is greater than ours, bail
if vote.Term > r.getCurrentTerm() {
r.logger.Debug("newer term discovered, fallback to follower")
Expand All @@ -318,30 +333,37 @@ func (r *Raft) runCandidate() {
}

case c := <-r.configurationChangeCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
c.respond(ErrNotLeader)

case a := <-r.applyCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
a.respond(ErrNotLeader)

case v := <-r.verifyCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)

case r := <-r.userRestoreCh:
case ur := <-r.userRestoreCh:
r.mainThreadSaturation.working()
// Reject any restores since we are not the leader
r.respond(ErrNotLeader)
ur.respond(ErrNotLeader)

case r := <-r.leadershipTransferCh:
case l := <-r.leadershipTransferCh:
r.mainThreadSaturation.working()
// Reject any operations since we are not the leader
r.respond(ErrNotLeader)
l.respond(ErrNotLeader)

case c := <-r.configurationsCh:
r.mainThreadSaturation.working()
c.configurations = r.configurations.Clone()
c.respond(nil)

case b := <-r.bootstrapCh:
r.mainThreadSaturation.working()
b.respond(ErrCantBootstrap)

case <-r.leaderNotifyCh:
Expand All @@ -354,6 +376,7 @@ func (r *Raft) runCandidate() {
}

case <-electionTimer:
r.mainThreadSaturation.working()
// Election failed! Restart the election. We simply return,
// which will kick us back into runCandidate
r.logger.Warn("Election timeout reached, restarting election")
Expand Down Expand Up @@ -598,14 +621,19 @@ func (r *Raft) leaderLoop() {
lease := time.After(r.config().LeaderLeaseTimeout)

for r.getState() == Leader {
r.mainThreadSaturation.sleeping()

select {
case rpc := <-r.rpcCh:
r.mainThreadSaturation.working()
r.processRPC(rpc)

case <-r.leaderState.stepDown:
r.mainThreadSaturation.working()
r.setState(Follower)

case future := <-r.leadershipTransferCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
Expand Down Expand Up @@ -686,6 +714,7 @@ func (r *Raft) leaderLoop() {
go r.leadershipTransfer(*id, *address, state, stopCh, doneCh)

case <-r.leaderState.commitCh:
r.mainThreadSaturation.working()
// Process the newly committed entries
oldCommitIndex := r.getCommitIndex()
commitIndex := r.leaderState.commitment.getCommitIndex()
Expand Down Expand Up @@ -748,6 +777,7 @@ func (r *Raft) leaderLoop() {
}

case v := <-r.verifyCh:
r.mainThreadSaturation.working()
if v.quorumSize == 0 {
// Just dispatched, start the verification
r.verifyLeader(v)
Expand All @@ -772,6 +802,7 @@ func (r *Raft) leaderLoop() {
}

case future := <-r.userRestoreCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
Expand All @@ -781,6 +812,7 @@ func (r *Raft) leaderLoop() {
future.respond(err)

case future := <-r.configurationsCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
Expand All @@ -790,6 +822,7 @@ func (r *Raft) leaderLoop() {
future.respond(nil)

case future := <-r.configurationChangeChIfStable():
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
Expand All @@ -798,9 +831,11 @@ func (r *Raft) leaderLoop() {
r.appendConfigurationEntry(future)

case b := <-r.bootstrapCh:
r.mainThreadSaturation.working()
b.respond(ErrCantBootstrap)

case newLog := <-r.applyCh:
r.mainThreadSaturation.working()
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
Expand Down Expand Up @@ -829,6 +864,7 @@ func (r *Raft) leaderLoop() {
}

case <-lease:
r.mainThreadSaturation.working()
// Check if we've exceeded the lease, potentially stepping down
maxDiff := r.checkLeaderLease()

Expand Down
139 changes: 139 additions & 0 deletions saturation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package raft

import (
"math"
"time"

"github.com/armon/go-metrics"
)

// saturationMetric measures the saturation (percentage of time spent working vs
// waiting for work) of an event processing loop, such as runFSM. It reports the
// saturation as a gauge metric (at most) once every reportInterval.
//
// Callers must instrument their loop with calls to sleeping and working, starting
// with a call to sleeping.
//
// Note: it is expected that the caller is single-threaded and is not safe for
// concurrent use by multiple goroutines.
type saturationMetric struct {
// reportInterval is the maximum frequency at which the gauge will be
// updated (it may be less frequent if the caller is idle).
reportInterval time.Duration

// index of the current sample. We rely on it wrapping-around on overflow and
// underflow, to implement a circular buffer (note the matching size of the
// samples array).
index uint8

// samples is a fixed-size array of samples (similar to a circular buffer)
// where elements at even-numbered indexes contain time spent sleeping, and
// elements at odd-numbered indexes contain time spent working.
samples [math.MaxUint8 + 1]struct {
v time.Duration // measurement
t time.Time // time at which the measurement was captured
}

sleepBegan, workBegan time.Time
lastReport time.Time

// These are overwritten in tests.
nowFn func() time.Time
reportFn func(float32)
}

// newSaturationMetric creates a saturationMetric that will update the gauge
// with the given name at the given reportInterval.
func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric {
return &saturationMetric{
reportInterval: reportInterval,
lastReport: time.Now(), // Set to now to avoid reporting immediately.
nowFn: time.Now,
reportFn: func(sat float32) { metrics.SetGauge(name, sat) },
}
}

// sleeping records the time at which the caller began waiting for work. After
// the initial call it must always be proceeded by a call to working.
func (s *saturationMetric) sleeping() {
s.sleepBegan = s.nowFn()

// Caller should've called working (we've probably missed a branch of a
// select). Reset sleepBegan without recording a sample to "lose" time
// instead of recording nonsense data.
if s.index%2 == 1 {
return
}

if !s.workBegan.IsZero() {
sample := &s.samples[s.index-1]
sample.v = s.sleepBegan.Sub(s.workBegan)
sample.t = s.sleepBegan
}

s.index++
s.report()
}

// working records the time at which the caller began working. It must always
// be proceeded by a call to sleeping.
func (s *saturationMetric) working() {
s.workBegan = s.nowFn()

// Caller should've called sleeping. Reset workBegan without recording a
// sample to "lose" time instead of recording nonsense data.
if s.index%2 == 0 {
return
}

sample := &s.samples[s.index-1]
sample.v = s.workBegan.Sub(s.sleepBegan)
sample.t = s.workBegan

s.index++
s.report()
}

// report updates the gauge if reportInterval has passed since our last report.
func (s *saturationMetric) report() {
if s.nowFn().Sub(s.lastReport) < s.reportInterval {
return
}

workSamples := make([]time.Duration, 0, len(s.samples)/2)
sleepSamples := make([]time.Duration, 0, len(s.samples)/2)

for idx, sample := range s.samples {
if !sample.t.After(s.lastReport) {
continue
}

if idx%2 == 0 {
sleepSamples = append(sleepSamples, sample.v)
} else {
workSamples = append(workSamples, sample.v)
}
}

// Ensure we take an equal number of work and sleep samples to avoid
// over/under reporting.
take := len(workSamples)
if len(sleepSamples) < take {
take = len(sleepSamples)
}

var saturation float32
if take != 0 {
var work, sleep time.Duration
for _, s := range workSamples[0:take] {
work += s
}
for _, s := range sleepSamples[0:take] {
sleep += s
}
saturation = float32(work) / float32(work+sleep)
}

s.reportFn(saturation)
s.lastReport = s.nowFn()
}
Loading

0 comments on commit 8ac5ab2

Please sign in to comment.