Skip to content

Commit

Permalink
Limit num pending proposals in apply channel (#3340)
Browse files Browse the repository at this point in the history
Add a ramp meter to avoid pushing too many big proposals to apply channel to prevent OOM.

Changes:
* Apply ramp meter before considering current proposal
* Add a log to warn if pending proposals exceeds the limit by two.
  • Loading branch information
manishrjain authored Apr 30, 2019
1 parent 5794bd6 commit 4d53eb8
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type node struct {

canCampaign bool
elog trace.EventLog

pendingSize int64
}

// Now that we apply txn updates via Raft, waiting based on Txn timestamps is
Expand Down Expand Up @@ -442,10 +444,12 @@ func (n *node) processApplyCh() {

// This function must be run serially.
handle := func(proposals []*pb.Proposal) {
var totalSize int64
for _, proposal := range proposals {
// We use the size as a double check to ensure that we're
// working with the same proposal as before.
psz := proposal.Size()
totalSize += int64(psz)

var perr error
p, ok := previous[proposal.Key]
Expand Down Expand Up @@ -482,6 +486,9 @@ func (n *node) processApplyCh() {
n.Proposals.Done(proposal.Key, perr)
n.Applied.Done(proposal.Index)
}
if sz := atomic.AddInt64(&n.pendingSize, -totalSize); sz < 0 {
glog.Warningf("Pending size should remain above zero: %d", sz)
}
}

maxAge := 10 * time.Minute
Expand Down Expand Up @@ -646,6 +653,22 @@ func (n *node) proposeSnapshot(discardN int) error {
return n.Raft().Propose(n.ctx, data)
}

const maxPendingSize int64 = 64 << 20 // in bytes.

func (n *node) rampMeter() {
start := time.Now()
defer func() {
if dur := time.Since(start); dur > time.Second {
glog.Infof("Blocked pushing to applyCh for %v", dur.Round(time.Millisecond))
}
}()
for {
if atomic.LoadInt64(&n.pendingSize) <= maxPendingSize {
return
}
time.Sleep(3 * time.Millisecond)
}
}
func (n *node) Run() {
defer n.closer.Done() // CLOSER:1

Expand Down Expand Up @@ -822,6 +845,17 @@ func (n *node) Run() {
}
// Send the whole lot to applyCh in one go, instead of sending proposals one by one.
if len(proposals) > 0 {
// Apply the meter this before adding size to pending size so some crazy big
// proposal can be pushed to applyCh. If this do this after adding its size to
// pending size, we could block forever in rampMeter.
n.rampMeter()
var pendingSize int64
for _, p := range proposals {
pendingSize += int64(p.Size())
}
if sz := atomic.AddInt64(&n.pendingSize, pendingSize); sz > 2*maxPendingSize {
glog.Warningf("Inflight proposal size: %d. There would be some throttling.", sz)
}
n.applyCh <- proposals
}

Expand Down

0 comments on commit 4d53eb8

Please sign in to comment.