From 56927959a903a0236d35277e4369ce68c24314c6 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 5 Oct 2021 13:05:41 +0000 Subject: [PATCH 1/2] kvserver: zero out slice before reuse Standard Go error - we were trying to avoid allocations by recycling a slice but weren't zeroing it out before. The occasional long slice that would reference a ton of memory would then effectively keep that large amount of memory alive forever. Touches #71050. Release note: None --- pkg/kv/kvserver/raft_transport.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 7eb0bb452be6..6798d2b789bf 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -503,12 +503,18 @@ func (t *RaftTransport) processQueue( } err := stream.Send(batch) - batch.Requests = batch.Requests[:0] - - atomic.AddInt64(&stats.clientSent, 1) if err != nil { return err } + + // Reuse the Requests slice, but zero out the contents to avoid delaying + // GC of memory referenced from within. + for i := range batch.Requests { + batch.Requests[i] = RaftMessageRequest{} + } + batch.Requests = batch.Requests[:0] + + atomic.AddInt64(&stats.clientSent, 1) } } } From 490ccfe9d61e1714af22c7fc0a86326e8a00ab41 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 5 Oct 2021 13:12:55 +0000 Subject: [PATCH 2/2] kvserver: introduce `kv.raft.command.target_batch_size` limit to raft msg batching In #71050, we saw evidence of very large (2.3+GiB) Raft messages being sent on the stream, which overwhelmed both the sender and the receiver. Raft messages are batched up before sending and so what must have happened here is that a large number of reasonably-sized messages (up to 64MiB in this case due to the max_size setting) were merged into a giant blob. As of this commit, we apply the target-size chunking on the batching step before sending messages as well. Closes #71050. Release note: None --- pkg/kv/kvserver/raft_transport.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 6798d2b789bf..f93f78a04165 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -52,6 +53,19 @@ const ( raftIdleTimeout = time.Minute ) +// targetRaftOutgoingBatchSize wraps "kv.raft.command.target_batch_size". +var targetRaftOutgoingBatchSize = settings.RegisterByteSizeSetting( + "kv.raft.command.target_batch_size", + "size of a batch of raft commands after which it will be sent without further batching", + 64<<20, // 64 MB + func(size int64) error { + if size < 1 { + return errors.New("must be positive") + } + return nil + }, +) + // RaftMessageResponseStream is the subset of the // MultiRaft_RaftMessageServer interface that is needed for sending responses. type RaftMessageResponseStream interface { @@ -487,18 +501,18 @@ func (t *RaftTransport) processQueue( case err := <-errCh: return err case req := <-ch: + budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - int64(req.Size()) batch.Requests = append(batch.Requests, *req) req.release() - // Pull off as many queued requests as possible. - // - // TODO(peter): Think about limiting the size of the batch we send. - for done := false; !done; { + // Pull off as many queued requests as possible, within reason. + for budget > 0 { select { case req = <-ch: + budget -= int64(req.Size()) batch.Requests = append(batch.Requests, *req) req.release() default: - done = true + budget = -1 } }