Skip to content

Commit

Permalink
pubsub: check early if streaming iterator is already drained
Browse files Browse the repository at this point in the history
Fix the fake streaming server to behave more realistically, by waiting
for a CloseSend from the client before ending the stream.

This revealed that the streaming iterator has an unnecessary delay,
due to the stop method blocking on the channel when there were no more
messages. So we now check that case before reading from the channel.

Also, s/Skip/SkipNow/ in tests.

Change-Id: Iacd826c07f6d327475c69e5002cab2381e47d9a5
Reviewed-on: https://code-review.googlesource.com/11374
Reviewed-by: kokoro <[email protected]>
Reviewed-by: Michael Darakananda <[email protected]>
Reviewed-by: Kamal Aboul-Hosn <[email protected]>
  • Loading branch information
jba committed Mar 10, 2017
1 parent cc13a9b commit 581b839
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
12 changes: 7 additions & 5 deletions pubsub/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,15 @@ func (s *fakeServer) StreamingPull(stream pb.Subscriber_StreamingPullServer) err
// Send responses.
for {
if len(s.pullResponses) == 0 {
return nil
// Nothing to send, so wait for the client to shut down the stream.
err := <-errc // a real error, or at least EOF
if err == io.EOF {
return nil
}
return err
}
pr := s.pullResponses[0]
// Repeat last response.
if len(s.pullResponses) > 1 {
s.pullResponses = s.pullResponses[1:]
}
s.pullResponses = s.pullResponses[1:]
if pr.err == io.EOF {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ func (it *streamingMessageIterator) stop() {
if it.err == nil {
it.err = iterator.Done
}
// Before reading from the channel, see if we're already drained.
it.checkDrained()
it.mu.Unlock()
// Nack all the pending messages.
// Grab the lock separately for each message to allow the receiver
Expand Down
12 changes: 6 additions & 6 deletions pubsub/streaming_pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) {

func testStreamingPullIteration(t *testing.T, client *Client, server *fakeServer, msgs []*pb.ReceivedMessage) {
if !useStreamingPull {
t.Skip()
t.SkipNow()
}
sub := client.Subscription("s")
iter, err := sub.Pull(context.Background())
Expand Down Expand Up @@ -101,7 +101,7 @@ func testStreamingPullIteration(t *testing.T, client *Client, server *fakeServer

func TestStreamingPullStop(t *testing.T) {
if !useStreamingPull {
t.Skip()
t.SkipNow()
}
// After Stop is called, Next returns iterator.Done.
client, server := newFake(t)
Expand All @@ -128,7 +128,7 @@ func TestStreamingPullStop(t *testing.T) {

func TestStreamingPullError(t *testing.T) {
if !useStreamingPull {
t.Skip()
t.SkipNow()
}
client, server := newFake(t)
server.addStreamingPullError(grpc.Errorf(codes.Internal, ""))
Expand All @@ -148,7 +148,7 @@ func TestStreamingPullError(t *testing.T) {

func TestStreamingPullCancel(t *testing.T) {
if !useStreamingPull {
t.Skip()
t.SkipNow()
}
// Test that canceling the iterator's context behaves correctly.
client, server := newFake(t)
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestStreamingPullCancel(t *testing.T) {

func TestStreamingPullRetry(t *testing.T) {
if !useStreamingPull {
t.Skip()
t.SkipNow()
}
// Check that we retry on io.EOF or Unavailable.
client, server := newFake(t)
Expand All @@ -202,7 +202,7 @@ func TestStreamingPullRetry(t *testing.T) {

func TestStreamingPullConcurrent(t *testing.T) {
if !useStreamingPull {
t.Skip()
t.SkipNow()
}
newMsg := func(i int) *pb.ReceivedMessage {
return &pb.ReceivedMessage{
Expand Down

0 comments on commit 581b839

Please sign in to comment.