Skip to content

Commit

Permalink
Request stats with future over new statsCh
Browse files Browse the repository at this point in the history
For hashicorp#167: Make the stats into a real structure
  • Loading branch information
ongardie committed Oct 17, 2016
1 parent 2b8b7e7 commit afb9383
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 48 deletions.
58 changes: 12 additions & 46 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ type Raft struct {
// data safely from outside of the main thread.
membershipsCh chan *membershipsFuture

// statsCh is used to get stats safely from outside of the main thread.
statsCh chan *statsFuture

// bootstrapCh is used to attempt an initial bootstrap from outside of
// the main thread.
bootstrapCh chan *bootstrapFuture
Expand Down Expand Up @@ -459,6 +462,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
trans: trans,
verifyCh: make(chan *verifyFuture, 64),
membershipsCh: make(chan *membershipsFuture, 8),
statsCh: make(chan *statsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
}
Expand Down Expand Up @@ -921,53 +925,15 @@ func (s *Stats) String() string {
}

// Stats returns various internal stats.
func (r *Raft) Stats() *Stats {
lastLogIndex, lastLogTerm := r.shared.getLastLog()
lastSnapIndex, lastSnapTerm := r.shared.getLastSnapshot()
s := &Stats{
State: r.shared.getState(),
Term: r.shared.getCurrentTerm(),
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
CommitIndex: r.shared.getCommitIndex(),
AppliedIndex: r.shared.getLastApplied(),
FSMPending: len(r.fsmCommitCh),
LastSnapshotIndex: lastSnapIndex,
LastSnapshotTerm: lastSnapTerm,
LastContact: r.LastContact(),
ProtocolVersion: r.protocolVersion,
ProtocolVersionMin: ProtocolVersionMin,
ProtocolVersionMax: ProtocolVersionMax,
SnapshotVersionMin: SnapshotVersionMin,
SnapshotVersionMax: SnapshotVersionMax,
}

future := r.GetMembership()
if err := future.Error(); err != nil {
r.logger.Warn("could not get configuration for Stats: %v", err)
} else {
membership := future.Membership()
s.LatestMembershipIndex = future.Index()
s.LatestMembership = membership

// This is a legacy metric that we've seen people use in the wild.
hasUs := false
numPeers := 0
for _, server := range membership.Servers {
if server.Suffrage == Voter {
if server.ID == r.localID {
hasUs = true
} else {
numPeers++
}
}
}
if !hasUs {
numPeers = 0
}
s.NumPeers = numPeers
func (r *Raft) Stats() StatsFuture {
statsReq := &statsFuture{}
statsReq.init()
select {
case <-r.shutdownCh:
statsReq.respond(ErrRaftShutdown)
case r.statsCh <- statsReq:
}
return s
return statsReq
}

// LastIndex returns the last index in stable storage,
Expand Down
8 changes: 6 additions & 2 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import "testing"

func TestAPI_Stats(t *testing.T) {
c := MakeCluster(1, t, nil)
s := c.rafts[0].Stats()
future := c.rafts[0].Stats()
if err := future.Error(); err != nil {
c.FailNowf("[ERR] Stats() returned err %v", err)
}
s := future.Stats()
if s.LastLogTerm != 1 {
c.FailNowf("[ERR] err: stats.LastLogTerm expected 1, got %v", s.LastLogTerm)
c.FailNowf("[ERR] stats.LastLogTerm expected 1, got %v", s.LastLogTerm)
}
c.Close()
}
16 changes: 16 additions & 0 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,19 @@ func (a *appendFuture) Request() *AppendEntriesRequest {
func (a *appendFuture) Response() *AppendEntriesResponse {
return a.resp
}

type StatsFuture interface {
Future
// Stats returns variuos bits of internal information. This must
// not be called until after the Error method has returned.
Stats() *Stats
}

type statsFuture struct {
deferError
stats *Stats
}

func (s *statsFuture) Stats() *Stats {
return s.stats
}
59 changes: 59 additions & 0 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ func (r *Raft) runFollower() {
c.memberships = r.memberships.Clone()
c.respond(nil)

case f := <-r.statsCh:
f.stats = r.stats()
f.respond(nil)

case b := <-r.bootstrapCh:
b.respond(r.liveBootstrap(b.membership))

Expand Down Expand Up @@ -296,6 +300,10 @@ func (r *Raft) runCandidate() {
c.memberships = r.memberships.Clone()
c.respond(nil)

case f := <-r.statsCh:
f.stats = r.stats()
f.respond(nil)

case b := <-r.bootstrapCh:
b.respond(ErrCantBootstrap)

Expand Down Expand Up @@ -543,6 +551,10 @@ func (r *Raft) leaderLoop() {
case future := <-r.membershipChangeChIfStable():
r.appendMembershipEntry(future)

case f := <-r.statsCh:
f.stats = r.stats()
f.respond(nil)

case b := <-r.bootstrapCh:
b.respond(ErrCantBootstrap)

Expand Down Expand Up @@ -1399,3 +1411,50 @@ func (r *Raft) setState(state RaftState) {
r.observe(state)
}
}

// Fills in stats when requested by application.
// Must only be called from the main Raft goroutine.
func (r *Raft) stats() *Stats {
lastLogIndex, lastLogTerm := r.shared.getLastLog()
lastSnapIndex, lastSnapTerm := r.shared.getLastSnapshot()
s := &Stats{
State: r.shared.getState(),
Term: r.shared.getCurrentTerm(),
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
CommitIndex: r.shared.getCommitIndex(),
AppliedIndex: r.shared.getLastApplied(),
FSMPending: len(r.fsmCommitCh),
LastSnapshotIndex: lastSnapIndex,
LastSnapshotTerm: lastSnapTerm,
LastContact: r.LastContact(),
ProtocolVersion: r.protocolVersion,
ProtocolVersionMin: ProtocolVersionMin,
ProtocolVersionMax: ProtocolVersionMax,
SnapshotVersionMin: SnapshotVersionMin,
SnapshotVersionMax: SnapshotVersionMax,
}

membership := r.memberships.latest
s.LatestMembershipIndex = r.memberships.latestIndex
s.LatestMembership = membership

// This is a legacy metric that we've seen people use in the wild.
hasUs := false
numPeers := 0
for _, server := range membership.Servers {
if server.Suffrage == Voter {
if server.ID == r.localID {
hasUs = true
} else {
numPeers++
}
}
}
if !hasUs {
numPeers = 0
}
s.NumPeers = numPeers

return s
}

0 comments on commit afb9383

Please sign in to comment.