Skip to content

Commit

Permalink
kvserver: Correct disablingClientStream behavior
Browse files Browse the repository at this point in the history
Change disablingClientStream to buffer messages instead of dropping
them when it is in the disabled state. Once it is no longer disabled
send all the outstanding messages. Resolves a flake: #84041

Release note: None
  • Loading branch information
andrewbaptist committed Jul 8, 2022
1 parent ab9eabf commit 7405c56
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4662,15 +4662,38 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) {
wg.Wait()
}

// disablingClientStream allows delaying rRPC messages based on a user provided
// function. It is OK to arbitrarily delay messages, but if they are dropped, it
// breaks application level expectations of in-order delivery. By returning nil
// immediately, and then sending when the stream is not disabled, we don't break
// the SendMsg contract. Note that this test is still a little too high level in
// the sense that it is blocking at the gRPC layer, and not the TCP layer, but
// that is much more complex to fully implement, and this should get equivalent
// results.
type disablingClientStream struct {
grpc.ClientStream
disabled func() bool
wasDisabled bool
buffer []interface{}
disabled func() bool
}

func (cs *disablingClientStream) SendMsg(m interface{}) error {
// When the stream is disabled, buffer all the messages, but don't send.
if cs.disabled() {
cs.buffer = append(cs.buffer, m)
cs.wasDisabled = true
return nil
}
// Now that it transitioned from disabled to not disabled, flush all the
// messages out in the same order as originally expected.
if cs.wasDisabled {
for _, buf := range cs.buffer {
_ = cs.ClientStream.SendMsg(buf)
}
cs.buffer = nil
cs.wasDisabled = false
}

return cs.ClientStream.SendMsg(m)
}

Expand Down

0 comments on commit 7405c56

Please sign in to comment.