From 4d53eb8cab49753bf45756e81ebceb4e9998509b Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 29 Apr 2019 19:36:29 -0700 Subject: [PATCH] Limit num pending proposals in apply channel (#3340) Add a ramp meter to avoid pushing too many big proposals to apply channel to prevent OOM. Changes: * Apply ramp meter before considering current proposal * Add a log to warn if pending proposals exceeds the limit by two. --- worker/draft.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/worker/draft.go b/worker/draft.go index 54595fd76ff..ba76ee8dd9e 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -68,6 +68,8 @@ type node struct { canCampaign bool elog trace.EventLog + + pendingSize int64 } // Now that we apply txn updates via Raft, waiting based on Txn timestamps is @@ -442,10 +444,12 @@ func (n *node) processApplyCh() { // This function must be run serially. handle := func(proposals []*pb.Proposal) { + var totalSize int64 for _, proposal := range proposals { // We use the size as a double check to ensure that we're // working with the same proposal as before. psz := proposal.Size() + totalSize += int64(psz) var perr error p, ok := previous[proposal.Key] @@ -482,6 +486,9 @@ func (n *node) processApplyCh() { n.Proposals.Done(proposal.Key, perr) n.Applied.Done(proposal.Index) } + if sz := atomic.AddInt64(&n.pendingSize, -totalSize); sz < 0 { + glog.Warningf("Pending size should remain above zero: %d", sz) + } } maxAge := 10 * time.Minute @@ -646,6 +653,22 @@ func (n *node) proposeSnapshot(discardN int) error { return n.Raft().Propose(n.ctx, data) } +const maxPendingSize int64 = 64 << 20 // in bytes. + +func (n *node) rampMeter() { + start := time.Now() + defer func() { + if dur := time.Since(start); dur > time.Second { + glog.Infof("Blocked pushing to applyCh for %v", dur.Round(time.Millisecond)) + } + }() + for { + if atomic.LoadInt64(&n.pendingSize) <= maxPendingSize { + return + } + time.Sleep(3 * time.Millisecond) + } +} func (n *node) Run() { defer n.closer.Done() // CLOSER:1 @@ -822,6 +845,17 @@ func (n *node) Run() { } // Send the whole lot to applyCh in one go, instead of sending proposals one by one. if len(proposals) > 0 { + // Apply the meter this before adding size to pending size so some crazy big + // proposal can be pushed to applyCh. If this do this after adding its size to + // pending size, we could block forever in rampMeter. + n.rampMeter() + var pendingSize int64 + for _, p := range proposals { + pendingSize += int64(p.Size()) + } + if sz := atomic.AddInt64(&n.pendingSize, pendingSize); sz > 2*maxPendingSize { + glog.Warningf("Inflight proposal size: %d. There would be some throttling.", sz) + } n.applyCh <- proposals }