diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 52ec94f5b392..77ad3976f054 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -53,6 +54,19 @@ const ( raftIdleTimeout = time.Minute ) +// targetRaftOutgoingBatchSize wraps "kv.raft.command.target_batch_size". +var targetRaftOutgoingBatchSize = settings.RegisterByteSizeSetting( + "kv.raft.command.target_batch_size", + "size of a batch of raft commands after which it will be sent without further batching", + 64<<20, // 64 MB + func(size int64) error { + if size < 1 { + return errors.New("must be positive") + } + return nil + }, +) + // RaftMessageResponseStream is the subset of the // MultiRaft_RaftMessageServer interface that is needed for sending responses. type RaftMessageResponseStream interface { @@ -488,18 +502,18 @@ func (t *RaftTransport) processQueue( case err := <-errCh: return err case req := <-ch: + budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - int64(req.Size()) batch.Requests = append(batch.Requests, *req) req.release() - // Pull off as many queued requests as possible. - // - // TODO(peter): Think about limiting the size of the batch we send. - for done := false; !done; { + // Pull off as many queued requests as possible, within reason. + for budget > 0 { select { case req = <-ch: + budget -= int64(req.Size()) batch.Requests = append(batch.Requests, *req) req.release() default: - done = true + budget = -1 } }