From 581b8393c374fc0c5e3e91f07bc95935afb30df2 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 8 Mar 2017 20:41:01 -0500 Subject: [PATCH] pubsub: check early if streaming iterator is already drained Fix the fake streaming server to behave more realistically, by waiting for a CloseSend from the client before ending the stream. This revealed that the streaming iterator has an unnecessary delay, due to the stop method blocking on the channel when there were no more messages. So we now check that case before reading from the channel. Also, s/Skip/SkipNow/ in tests. Change-Id: Iacd826c07f6d327475c69e5002cab2381e47d9a5 Reviewed-on: https://code-review.googlesource.com/11374 Reviewed-by: kokoro Reviewed-by: Michael Darakananda Reviewed-by: Kamal Aboul-Hosn --- pubsub/fake_test.go | 12 +++++++----- pubsub/iterator.go | 2 ++ pubsub/streaming_pull_test.go | 12 ++++++------ 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pubsub/fake_test.go b/pubsub/fake_test.go index 552dd1e61fed..9f7f0ef00a0d 100644 --- a/pubsub/fake_test.go +++ b/pubsub/fake_test.go @@ -104,13 +104,15 @@ func (s *fakeServer) StreamingPull(stream pb.Subscriber_StreamingPullServer) err // Send responses. for { if len(s.pullResponses) == 0 { - return nil + // Nothing to send, so wait for the client to shut down the stream. + err := <-errc // a real error, or at least EOF + if err == io.EOF { + return nil + } + return err } pr := s.pullResponses[0] - // Repeat last response. - if len(s.pullResponses) > 1 { - s.pullResponses = s.pullResponses[1:] - } + s.pullResponses = s.pullResponses[1:] if pr.err == io.EOF { return nil } diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 7c644ccf21fc..4da7ca38920c 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -323,6 +323,8 @@ func (it *streamingMessageIterator) stop() { if it.err == nil { it.err = iterator.Done } + // Before reading from the channel, see if we're already drained. + it.checkDrained() it.mu.Unlock() // Nack all the pending messages. // Grab the lock separately for each message to allow the receiver diff --git a/pubsub/streaming_pull_test.go b/pubsub/streaming_pull_test.go index bff1c5366be6..c47adc1c701b 100644 --- a/pubsub/streaming_pull_test.go +++ b/pubsub/streaming_pull_test.go @@ -58,7 +58,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) { func testStreamingPullIteration(t *testing.T, client *Client, server *fakeServer, msgs []*pb.ReceivedMessage) { if !useStreamingPull { - t.Skip() + t.SkipNow() } sub := client.Subscription("s") iter, err := sub.Pull(context.Background()) @@ -101,7 +101,7 @@ func testStreamingPullIteration(t *testing.T, client *Client, server *fakeServer func TestStreamingPullStop(t *testing.T) { if !useStreamingPull { - t.Skip() + t.SkipNow() } // After Stop is called, Next returns iterator.Done. client, server := newFake(t) @@ -128,7 +128,7 @@ func TestStreamingPullStop(t *testing.T) { func TestStreamingPullError(t *testing.T) { if !useStreamingPull { - t.Skip() + t.SkipNow() } client, server := newFake(t) server.addStreamingPullError(grpc.Errorf(codes.Internal, "")) @@ -148,7 +148,7 @@ func TestStreamingPullError(t *testing.T) { func TestStreamingPullCancel(t *testing.T) { if !useStreamingPull { - t.Skip() + t.SkipNow() } // Test that canceling the iterator's context behaves correctly. client, server := newFake(t) @@ -186,7 +186,7 @@ func TestStreamingPullCancel(t *testing.T) { func TestStreamingPullRetry(t *testing.T) { if !useStreamingPull { - t.Skip() + t.SkipNow() } // Check that we retry on io.EOF or Unavailable. client, server := newFake(t) @@ -202,7 +202,7 @@ func TestStreamingPullRetry(t *testing.T) { func TestStreamingPullConcurrent(t *testing.T) { if !useStreamingPull { - t.Skip() + t.SkipNow() } newMsg := func(i int) *pb.ReceivedMessage { return &pb.ReceivedMessage{