Skip to content

Commit

Permalink
Avoid passing committed logs over channel to prevent deadlock possibi…
Browse files Browse the repository at this point in the history
…lity
  • Loading branch information
armon committed May 10, 2014
1 parent 2b35d21 commit 5800ad5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 63 deletions.
53 changes: 36 additions & 17 deletions inflight.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package raft

import (
"container/list"
"net"
"sync"
)
Expand Down Expand Up @@ -40,7 +41,8 @@ func (m *majorityQuorum) IsCommitted() bool {
// Inflight is used to track operations that are still in-flight
type inflight struct {
sync.Mutex
commitCh chan *logFuture
committed *list.List
commitCh chan struct{}
minCommit uint64
maxCommit uint64
operations map[uint64]*logFuture
Expand All @@ -49,8 +51,9 @@ type inflight struct {

// NewInflight returns an inflight struct that notifies
// the provided channel when logs are finished commiting.
func newInflight(commitCh chan *logFuture) *inflight {
func newInflight(commitCh chan struct{}) *inflight {
return &inflight{
committed: list.New(),
commitCh: commitCh,
minCommit: 0,
maxCommit: 0,
Expand Down Expand Up @@ -91,9 +94,17 @@ func (i *inflight) Cancel(err error) {
op.respond(err)
}

// Clear all the committed but not processed
for e := i.committed.Front(); e != nil; e = e.Next() {
e.Value.(*logFuture).respond(err)
}

// Clear the map
i.operations = make(map[uint64]*logFuture)

// Clear the list of committed
i.committed = list.New()

// Close the commmitCh
close(i.commitCh)

Expand All @@ -102,6 +113,14 @@ func (i *inflight) Cancel(err error) {
i.maxCommit = 0
}

// Committed returns all the committed operations in order
func (i *inflight) Committed() (l *list.List) {
i.Lock()
l, i.committed = i.committed, list.New()
i.Unlock()
return l
}

// Commit is used by leader replication routines to indicate that
// a follower was finished commiting a log to disk.
func (i *inflight) Commit(index uint64, peer net.Addr) {
Expand Down Expand Up @@ -146,23 +165,20 @@ func (i *inflight) commit(index uint64, peer net.Addr) {
return
}

// Notify of commit
NOTIFY:
select {
case i.commitCh <- op:
// Stop tracking since it is committed
delete(i.operations, index)

// Update the indexes
if index == i.maxCommit {
i.minCommit = 0
i.maxCommit = 0

} else {
i.minCommit++
}
// Add the operation to the committed list
i.committed.PushBack(op)

// Stop tracking since it is committed
delete(i.operations, index)

case <-i.stopCh:
// Update the indexes
if index == i.maxCommit {
i.minCommit = 0
i.maxCommit = 0

} else {
i.minCommit++
}

// Check if the next in-flight operation is ready
Expand All @@ -173,4 +189,7 @@ NOTIFY:
goto NOTIFY
}
}

// Async notify of ready operations
asyncNotifyCh(i.commitCh)
}
53 changes: 22 additions & 31 deletions inflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestInflight_StartCommit(t *testing.T) {
commitCh := make(chan *logFuture, 1)
commitCh := make(chan struct{}, 1)
in := newInflight(commitCh)

// Commit a transaction as being in flight
Expand All @@ -16,23 +16,17 @@ func TestInflight_StartCommit(t *testing.T) {

// Commit 3 times
in.Commit(1, nil)
select {
case <-commitCh:
if in.Committed().Len() != 0 {
t.Fatalf("should not be commited")
default:
}

in.Commit(1, nil)
select {
case <-commitCh:
if in.Committed().Len() != 0 {
t.Fatalf("should not be commited")
default:
}

in.Commit(1, nil)
select {
case <-commitCh:
default:
if in.Committed().Len() != 1 {
t.Fatalf("should be commited")
}

Expand All @@ -41,7 +35,7 @@ func TestInflight_StartCommit(t *testing.T) {
}

func TestInflight_Cancel(t *testing.T) {
commitCh := make(chan *logFuture, 1)
commitCh := make(chan struct{}, 1)
in := newInflight(commitCh)

// Commit a transaction as being in flight
Expand All @@ -63,7 +57,7 @@ func TestInflight_Cancel(t *testing.T) {
}

func TestInflight_CommitRange(t *testing.T) {
commitCh := make(chan *logFuture, 3)
commitCh := make(chan struct{}, 1)
in := newInflight(commitCh)

// Commit a few transaction as being in flight
Expand All @@ -85,14 +79,14 @@ func TestInflight_CommitRange(t *testing.T) {
in.CommitRange(1, 10, nil)

// Should get 3 back
if len(commitCh) != 3 {
if in.Committed().Len() != 3 {
t.Fatalf("expected all 3 to commit")
}
}

// Should panic if we commit non contiguously!
func TestInflight_NonContiguous(t *testing.T) {
commitCh := make(chan *logFuture, 3)
commitCh := make(chan struct{}, 1)
in := newInflight(commitCh)

// Commit a few transaction as being in flight
Expand All @@ -108,31 +102,28 @@ func TestInflight_NonContiguous(t *testing.T) {
in.Commit(3, nil)
in.Commit(3, nil) // panic!

select {
case <-commitCh:
if in.Committed().Len() != 0 {
t.Fatalf("should not commit")
default:
}

in.Commit(2, nil)
in.Commit(2, nil)
in.Commit(2, nil) // panic!

select {
case l := <-commitCh:
if l.log.Index != 2 {
t.Fatalf("bad: %v", *l)
}
default:
t.Fatalf("should commit")
committed := in.Committed()
if committed.Len() != 2 {
t.Fatalf("should commit both")
}

select {
case l := <-commitCh:
if l.log.Index != 3 {
t.Fatalf("bad: %v", *l)
}
default:
t.Fatalf("should commit")
current := committed.Front()
l := current.Value.(*logFuture)
if l.log.Index != 2 {
t.Fatalf("bad: %v", *l)
}

current = current.Next()
l = current.Value.(*logFuture)
if l.log.Index != 3 {
t.Fatalf("bad: %v", *l)
}
}
30 changes: 15 additions & 15 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type commitTuple struct {

// leaderState is state that is used while we are a leader
type leaderState struct {
commitCh chan *logFuture
commitCh chan struct{}
inflight *inflight
replState map[string]*followerReplication
notify map[*verifyFuture]struct{}
Expand Down Expand Up @@ -632,7 +632,7 @@ func (r *Raft) runLeader() {
asyncNotifyBool(r.leaderCh, true)

// Setup leader state
r.leaderState.commitCh = make(chan *logFuture, 128)
r.leaderState.commitCh = make(chan struct{}, 1)
r.leaderState.inflight = newInflight(r.leaderState.commitCh)
r.leaderState.replState = make(map[string]*followerReplication)
r.leaderState.notify = make(map[*verifyFuture]struct{})
Expand All @@ -647,11 +647,6 @@ func (r *Raft) runLeader() {
// Cancel inflight requests
r.leaderState.inflight.Cancel(ErrLeadershipLost)

// Respond to any requests in the queue
for future := range r.leaderState.commitCh {
future.respond(ErrLeadershipLost)
}

// Respond to any pending verify requets
for future := range r.leaderState.notify {
future.respond(ErrLeadershipLost)
Expand Down Expand Up @@ -715,14 +710,19 @@ func (r *Raft) leaderLoop() {
case rpc := <-r.rpcCh:
r.processRPC(rpc)

case commitLog := <-r.leaderState.commitCh:
// Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)

// Increment the commit index
idx := commitLog.log.Index
r.setCommitIndex(idx)
r.processLogs(idx, commitLog)
case <-r.leaderState.commitCh:
// Get the committed messages
committed := r.leaderState.inflight.Committed()
for e := committed.Front(); e != nil; e = e.Next() {
// Measure the commit time
commitLog := e.Value.(*logFuture)
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)

// Increment the commit index
idx := commitLog.log.Index
r.setCommitIndex(idx)
r.processLogs(idx, commitLog)
}

case v := <-r.verifyCh:
if v.quorumSize == 0 {
Expand Down

0 comments on commit 5800ad5

Please sign in to comment.