From ae5ff985ea320a25732c5a251e4fee45c5628060 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 5 Oct 2021 15:05:41 +0200 Subject: [PATCH 1/2] kvserver: zero out slice before reuse Standard Go error - we were trying to avoid allocations by recycling a slice but weren't zeroing it out before. The occasional long slice that would reference a ton of memory would then effectively keep that large amount of memory alive forever. Touches #71050. Release note: None --- pkg/kv/kvserver/raft_transport.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index c05b75bba03e..52ec94f5b392 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -504,12 +504,18 @@ func (t *RaftTransport) processQueue( } err := stream.Send(batch) - batch.Requests = batch.Requests[:0] - - atomic.AddInt64(&stats.clientSent, 1) if err != nil { return err } + + // Reuse the Requests slice, but zero out the contents to avoid delaying + // GC of memory referenced from within. + for i := range batch.Requests { + batch.Requests[i] = RaftMessageRequest{} + } + batch.Requests = batch.Requests[:0] + + atomic.AddInt64(&stats.clientSent, 1) } } } From 1d959cb5b898ac542157448c28082ec98c83ffe2 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 5 Oct 2021 15:12:55 +0200 Subject: [PATCH 2/2] kvserver: introduce `kv.raft.command.target_batch_size` limit to raft msg batching In #71050, we saw evidence of very large (2.3+GiB) Raft messages being sent on the stream, which overwhelmed both the sender and the receiver. Raft messages are batched up before sending and so what must have happened here is that a large number of reasonably-sized messages (up to 64MiB in this case due to the max_size setting) were merged into a giant blob. As of this commit, we apply the target-size chunking on the batching step before sending messages as well. Closes #71050. Release note: None --- pkg/kv/kvserver/raft_transport.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 52ec94f5b392..77ad3976f054 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -53,6 +54,19 @@ const ( raftIdleTimeout = time.Minute ) +// targetRaftOutgoingBatchSize wraps "kv.raft.command.target_batch_size". +var targetRaftOutgoingBatchSize = settings.RegisterByteSizeSetting( + "kv.raft.command.target_batch_size", + "size of a batch of raft commands after which it will be sent without further batching", + 64<<20, // 64 MB + func(size int64) error { + if size < 1 { + return errors.New("must be positive") + } + return nil + }, +) + // RaftMessageResponseStream is the subset of the // MultiRaft_RaftMessageServer interface that is needed for sending responses. type RaftMessageResponseStream interface { @@ -488,18 +502,18 @@ func (t *RaftTransport) processQueue( case err := <-errCh: return err case req := <-ch: + budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - int64(req.Size()) batch.Requests = append(batch.Requests, *req) req.release() - // Pull off as many queued requests as possible. - // - // TODO(peter): Think about limiting the size of the batch we send. - for done := false; !done; { + // Pull off as many queued requests as possible, within reason. + for budget > 0 { select { case req = <-ch: + budget -= int64(req.Size()) batch.Requests = append(batch.Requests, *req) req.release() default: - done = true + budget = -1 } }