From 7405c568e482fbfff4ad2bcba101ef42678d0b1a Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Fri, 8 Jul 2022 16:37:05 -0400 Subject: [PATCH] kvserver: Correct disablingClientStream behavior Change disablingClientStream to buffer messages instead of dropping them when it is in the disabled state. Once it is no longer disabled send all the outstanding messages. Resolves a flake: #84041 Release note: None --- pkg/kv/kvserver/client_raft_test.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 45a49d9c66cf..3c0121b8cdbb 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -4662,15 +4662,38 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) { wg.Wait() } +// disablingClientStream allows delaying rRPC messages based on a user provided +// function. It is OK to arbitrarily delay messages, but if they are dropped, it +// breaks application level expectations of in-order delivery. By returning nil +// immediately, and then sending when the stream is not disabled, we don't break +// the SendMsg contract. Note that this test is still a little too high level in +// the sense that it is blocking at the gRPC layer, and not the TCP layer, but +// that is much more complex to fully implement, and this should get equivalent +// results. type disablingClientStream struct { grpc.ClientStream - disabled func() bool + wasDisabled bool + buffer []interface{} + disabled func() bool } func (cs *disablingClientStream) SendMsg(m interface{}) error { + // When the stream is disabled, buffer all the messages, but don't send. if cs.disabled() { + cs.buffer = append(cs.buffer, m) + cs.wasDisabled = true return nil } + // Now that it transitioned from disabled to not disabled, flush all the + // messages out in the same order as originally expected. + if cs.wasDisabled { + for _, buf := range cs.buffer { + _ = cs.ClientStream.SendMsg(buf) + } + cs.buffer = nil + cs.wasDisabled = false + } + return cs.ClientStream.SendMsg(m) }