Skip to content

Commit

Permalink
raft: remove quorum() dependency from readOnly
Browse files Browse the repository at this point in the history
This now delegates the quorum computation to r.prs, which will allow
it to generalize in a straightforward way when #7625 is
addressed.
  • Loading branch information
tbg committed Apr 26, 2019
1 parent 8dbd0e3 commit 9011fbb
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
4 changes: 4 additions & 0 deletions raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ func (p *prs) quorum() int {
return len(p.nodes)/2 + 1
}

func (p *prs) hasQuorum(m map[uint64]struct{}) bool {
return len(m) >= p.quorum()
}

// committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
func (p *prs) committed() uint64 {
Expand Down
5 changes: 3 additions & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,8 @@ func stepLeader(r *raft, m pb.Message) error {
switch r.readOnly.option {
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
// The local node automatically acks the request.
r.readOnly.recvAck(r.id, m.Entries[0].Data)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
ri := r.raftLog.committed
Expand Down Expand Up @@ -1096,8 +1098,7 @@ func stepLeader(r *raft, m pb.Message) error {
return nil
}

ackCount := r.readOnly.recvAck(m)
if ackCount < r.prs.quorum() {
if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) {
return nil
}

Expand Down
23 changes: 12 additions & 11 deletions raft/read_only.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package raft

import pb "go.etcd.io/etcd/raft/raftpb"
import (
pb "go.etcd.io/etcd/raft/raftpb"
)

// ReadState provides state for read only query.
// It's caller's responsibility to call ReadIndex first before getting
Expand Down Expand Up @@ -50,26 +52,25 @@ func newReadOnly(option ReadOnlyOption) *readOnly {
// the read only request.
// `m` is the original read only request message from the local or remote node.
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
ctx := string(m.Entries[0].Data)
if _, ok := ro.pendingReadIndex[ctx]; ok {
s := string(m.Entries[0].Data)
if _, ok := ro.pendingReadIndex[s]; ok {
return
}
ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
ro.readIndexQueue = append(ro.readIndexQueue, ctx)
ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
ro.readIndexQueue = append(ro.readIndexQueue, s)
}

// recvAck notifies the readonly struct that the raft state machine received
// an acknowledgment of the heartbeat that attached with the read only request
// context.
func (ro *readOnly) recvAck(m pb.Message) int {
rs, ok := ro.pendingReadIndex[string(m.Context)]
func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} {
rs, ok := ro.pendingReadIndex[string(context)]
if !ok {
return 0
return nil
}

rs.acks[m.From] = struct{}{}
// add one to include an ack from local node
return len(rs.acks) + 1
rs.acks[id] = struct{}{}
return rs.acks
}

// advance advances the read only request queue kept by the readonly struct.
Expand Down

0 comments on commit 9011fbb

Please sign in to comment.