Skip to content

Commit

Permalink
Merge #71132
Browse files Browse the repository at this point in the history
71132: kvserver: apply a limit to outgoing raft msg batching r=erikgrinaker a=tbg

In #71050, we saw evidence of very large (2.3+GiB) Raft messages being
sent on the stream, which overwhelmed both the sender and the receiver.
Raft messages are batched up before sending and so what must have
happened here is that a large number of reasonably-sized messages (up to
64MiB in this case due to the `max_size` setting) were merged into a giant
blob. As of this commit, we apply the `max_size` chunking on the batching
step before sending messages as well.

Closes #71050.

Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Oct 20, 2021
2 parents a787dd1 + 1d959cb commit e8dc696
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -53,6 +54,19 @@ const (
raftIdleTimeout = time.Minute
)

// targetRaftOutgoingBatchSize wraps "kv.raft.command.target_batch_size".
var targetRaftOutgoingBatchSize = settings.RegisterByteSizeSetting(
"kv.raft.command.target_batch_size",
"size of a batch of raft commands after which it will be sent without further batching",
64<<20, // 64 MB
func(size int64) error {
if size < 1 {
return errors.New("must be positive")
}
return nil
},
)

// RaftMessageResponseStream is the subset of the
// MultiRaft_RaftMessageServer interface that is needed for sending responses.
type RaftMessageResponseStream interface {
Expand Down Expand Up @@ -488,28 +502,34 @@ func (t *RaftTransport) processQueue(
case err := <-errCh:
return err
case req := <-ch:
budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - int64(req.Size())
batch.Requests = append(batch.Requests, *req)
req.release()
// Pull off as many queued requests as possible.
//
// TODO(peter): Think about limiting the size of the batch we send.
for done := false; !done; {
// Pull off as many queued requests as possible, within reason.
for budget > 0 {
select {
case req = <-ch:
budget -= int64(req.Size())
batch.Requests = append(batch.Requests, *req)
req.release()
default:
done = true
budget = -1
}
}

err := stream.Send(batch)
batch.Requests = batch.Requests[:0]

atomic.AddInt64(&stats.clientSent, 1)
if err != nil {
return err
}

// Reuse the Requests slice, but zero out the contents to avoid delaying
// GC of memory referenced from within.
for i := range batch.Requests {
batch.Requests[i] = RaftMessageRequest{}
}
batch.Requests = batch.Requests[:0]

atomic.AddInt64(&stats.clientSent, 1)
}
}
}
Expand Down

0 comments on commit e8dc696

Please sign in to comment.