diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 7eb0bb452be6..f93f78a04165 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -52,6 +53,19 @@ const ( raftIdleTimeout = time.Minute ) +// targetRaftOutgoingBatchSize wraps "kv.raft.command.target_batch_size". +var targetRaftOutgoingBatchSize = settings.RegisterByteSizeSetting( + "kv.raft.command.target_batch_size", + "size of a batch of raft commands after which it will be sent without further batching", + 64<<20, // 64 MB + func(size int64) error { + if size < 1 { + return errors.New("must be positive") + } + return nil + }, +) + // RaftMessageResponseStream is the subset of the // MultiRaft_RaftMessageServer interface that is needed for sending responses. type RaftMessageResponseStream interface { @@ -487,28 +501,34 @@ func (t *RaftTransport) processQueue( case err := <-errCh: return err case req := <-ch: + budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - int64(req.Size()) batch.Requests = append(batch.Requests, *req) req.release() - // Pull off as many queued requests as possible. - // - // TODO(peter): Think about limiting the size of the batch we send. - for done := false; !done; { + // Pull off as many queued requests as possible, within reason. + for budget > 0 { select { case req = <-ch: + budget -= int64(req.Size()) batch.Requests = append(batch.Requests, *req) req.release() default: - done = true + budget = -1 } } err := stream.Send(batch) - batch.Requests = batch.Requests[:0] - - atomic.AddInt64(&stats.clientSent, 1) if err != nil { return err } + + // Reuse the Requests slice, but zero out the contents to avoid delaying + // GC of memory referenced from within. + for i := range batch.Requests { + batch.Requests[i] = RaftMessageRequest{} + } + batch.Requests = batch.Requests[:0] + + atomic.AddInt64(&stats.clientSent, 1) } } }