Skip to content

Commit

Permalink
kvserver: introduce kv.raft.command.target_batch_size limit to raft…
Browse files Browse the repository at this point in the history
… msg batching

In #71050, we saw evidence of very large (2.3+GiB) Raft messages being
sent on the stream, which overwhelmed both the sender and the receiver.
Raft messages are batched up before sending and so what must have
happened here is that a large number of reasonably-sized messages (up to
64MiB in this case due to the max_size setting) were merged into a giant
blob. As of this commit, we apply the target-size chunking on the batching
step before sending messages as well.

Closes #71050.

Release note: None
  • Loading branch information
tbg committed Oct 19, 2021
1 parent ae5ff98 commit 1d959cb
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -53,6 +54,19 @@ const (
raftIdleTimeout = time.Minute
)

// targetRaftOutgoingBatchSize wraps "kv.raft.command.target_batch_size".
var targetRaftOutgoingBatchSize = settings.RegisterByteSizeSetting(
"kv.raft.command.target_batch_size",
"size of a batch of raft commands after which it will be sent without further batching",
64<<20, // 64 MB
func(size int64) error {
if size < 1 {
return errors.New("must be positive")
}
return nil
},
)

// RaftMessageResponseStream is the subset of the
// MultiRaft_RaftMessageServer interface that is needed for sending responses.
type RaftMessageResponseStream interface {
Expand Down Expand Up @@ -488,18 +502,18 @@ func (t *RaftTransport) processQueue(
case err := <-errCh:
return err
case req := <-ch:
budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - int64(req.Size())
batch.Requests = append(batch.Requests, *req)
req.release()
// Pull off as many queued requests as possible.
//
// TODO(peter): Think about limiting the size of the batch we send.
for done := false; !done; {
// Pull off as many queued requests as possible, within reason.
for budget > 0 {
select {
case req = <-ch:
budget -= int64(req.Size())
batch.Requests = append(batch.Requests, *req)
req.release()
default:
done = true
budget = -1
}
}

Expand Down

0 comments on commit 1d959cb

Please sign in to comment.