From 23ca52827997158a6a0de1ddcadafa25e7b4f53e Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 16 Jul 2019 09:41:37 -0700 Subject: [PATCH 1/2] client: fix race between transport draining and new RPCs Before these fixes, it was possible to see errors on new RPCs after a connection began draining, and before establishing a new connection. There is an inherent race between choosing a SubConn and attempting to creating a stream on it. We should be able to avoid application-visible RPC errors due to this with transparent retry. However, several bugs were preventing this from working correctly: 1. Non-wait-for-ready RPCs were skipping transparent retry, though the retry design calls for retrying them. 2. The transport closed itself (and would consequently error new RPCs) before notifying the SubConn that it was draining. 3. The SubConn wasn't synchronously updating itself once it was notified about the closing or draining state. 4. The SubConn would go into the TRANSIENT_FAILURE state instantaneously, causing RPCs to fail instead of queue. --- balancer/grpclb/grpclb_test.go | 12 ++--- clientconn.go | 45 ++++++++++++------ clientconn_state_transition_test.go | 13 ++--- internal/transport/http2_client.go | 12 +++-- stream.go | 4 +- test/end2end_test.go | 12 ++--- test/goaway_test.go | 74 +++++++++++++++++++++++++++++ 7 files changed, 129 insertions(+), 43 deletions(-) create mode 100644 test/goaway_test.go diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 984d7f40cb50..278ca91a09c3 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -1125,9 +1125,9 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) { }) if err := checkStats(stats, &rpcStats{ - numCallsStarted: int64(countRPC), - numCallsFinished: int64(countRPC), - numCallsFinishedWithClientFailedToSend: int64(countRPC - 1), + numCallsStarted: int64(countRPC)*2 - 1, + numCallsFinished: int64(countRPC)*2 - 1, + numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2, numCallsFinishedKnownReceived: 1, }); err != nil { t.Fatal(err) @@ -1227,9 +1227,9 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) { }) if err := checkStats(stats, &rpcStats{ - numCallsStarted: int64(countRPC), - numCallsFinished: int64(countRPC), - numCallsFinishedWithClientFailedToSend: int64(countRPC - 1), + numCallsStarted: int64(countRPC)*2 - 1, + numCallsFinished: int64(countRPC)*2 - 1, + numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2, numCallsFinishedKnownReceived: 1, }); err != nil { t.Fatal(err) diff --git a/clientconn.go b/clientconn.go index 726e5584ed50..a7643df7d297 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1060,8 +1060,8 @@ func (ac *addrConn) resetTransport() { ac.mu.Lock() if ac.state == connectivity.Shutdown { - newTr.Close() ac.mu.Unlock() + newTr.Close() return } ac.curAddr = addr @@ -1076,20 +1076,16 @@ func (ac *addrConn) resetTransport() { // we restart from the top of the addr list. <-reconnect.Done() hcancel() - - // Need to reconnect after a READY, the addrConn enters - // TRANSIENT_FAILURE. + // restart connecting - the top of the loop will set state to + // CONNECTING. This is against the current connectivity semantics doc, + // however it allows for graceful behavior for RPCs not yet dispatched + // - unfortunate timing would otherwise lead to the RPC failing even + // though the TRANSIENT_FAILURE state (called for by the doc) would be + // instantaneous. // - // This will set addrConn to TRANSIENT_FAILURE for a very short period - // of time, and turns CONNECTING. It seems reasonable to skip this, but - // READY-CONNECTING is not a valid transition. - ac.mu.Lock() - if ac.state == connectivity.Shutdown { - ac.mu.Unlock() - return - } - ac.updateConnectivityState(connectivity.TransientFailure) - ac.mu.Unlock() + // Ideally we should transition to Idle here and block until there is + // RPC activity that leads to the balancer requesting a reconnect of + // the associated SubConn. } } @@ -1146,14 +1142,35 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne Authority: ac.cc.authority, } + once := sync.Once{} onGoAway := func(r transport.GoAwayReason) { ac.mu.Lock() ac.adjustParams(r) + once.Do(func() { + if ac.state == connectivity.Ready { + // Prevent this SubConn from being used for new RPCs by setting its + // state to Connecting. + // + // TODO: this should be Idle when grpc-go properly supports it. + ac.updateConnectivityState(connectivity.Connecting) + } + }) ac.mu.Unlock() reconnect.Fire() } onClose := func() { + ac.mu.Lock() + once.Do(func() { + if ac.state == connectivity.Ready { + // Prevent this SubConn from being used for new RPCs by setting its + // state to Connecting. + // + // TODO: this should be Idle when grpc-go properly supports it. + ac.updateConnectivityState(connectivity.Connecting) + } + }) + ac.mu.Unlock() close(onCloseCalled) reconnect.Fire() } diff --git a/clientconn_state_transition_test.go b/clientconn_state_transition_test.go index e38239678f76..1309e8941352 100644 --- a/clientconn_state_transition_test.go +++ b/clientconn_state_transition_test.go @@ -193,12 +193,11 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s } } -// When a READY connection is closed, the client enters TRANSIENT FAILURE before CONNECTING. -func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) { +// When a READY connection is closed, the client enters CONNECTING. +func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { want := []connectivity.State{ connectivity.Connecting, connectivity.Ready, - connectivity.TransientFailure, connectivity.Connecting, } @@ -260,8 +259,8 @@ func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) { } } -// When the first connection is closed, the client enters stays in CONNECTING -// until it tries the second address (which succeeds, and then it enters READY). +// When the first connection is closed, the client stays in CONNECTING until it +// tries the second address (which succeeds, and then it enters READY). func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) { want := []connectivity.State{ connectivity.Connecting, @@ -354,13 +353,11 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) } // When there are multiple addresses, and we enter READY on one of them, a -// later closure should cause the client to enter TRANSIENT FAILURE before it -// re-enters CONNECTING. +// later closure should cause the client to enter CONNECTING func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { want := []connectivity.State{ connectivity.Connecting, connectivity.Ready, - connectivity.TransientFailure, connectivity.Connecting, } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 9687ec7baa72..20e1bb935646 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -556,7 +556,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { close(s.headerChan) } - } hdr := &headerFrame{ hf: headerFields, @@ -769,6 +768,9 @@ func (t *http2Client) Close() error { t.mu.Unlock() return nil } + // Call t.onClose before setting the state to closing to prevent the client + // from attempting to create new streams ASAP. + t.onClose() t.state = closing streams := t.activeStreams t.activeStreams = nil @@ -789,7 +791,6 @@ func (t *http2Client) Close() error { } t.statsHandler.HandleConn(t.ctx, connEnd) } - t.onClose() return err } @@ -1085,11 +1086,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { default: t.setGoAwayReason(f) close(t.goAway) - t.state = draining t.controlBuf.put(&incomingGoAway{}) - - // This has to be a new goroutine because we're still using the current goroutine to read in the transport. + // Notify the clientconn about the GOAWAY before we set the state to + // draining, to allow the client to stop attempting to create streams + // before disallowing new streams on this connection. t.onGoAway(t.goAwayReason) + t.state = draining } // All streams with IDs greater than the GoAwayId // and smaller than the previous GoAway ID should be killed. diff --git a/stream.go b/stream.go index db14c3225d17..6be1be6da730 100644 --- a/stream.go +++ b/stream.go @@ -457,8 +457,8 @@ func (cs *clientStream) shouldRetry(err error) error { if cs.attempt.s != nil { <-cs.attempt.s.Done() } - if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { - // First attempt, wait-for-ready, stream unprocessed: transparently retry. + if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { + // First attempt, stream unprocessed: transparently retry. cs.firstAttempt = false return nil } diff --git a/test/end2end_test.go b/test/end2end_test.go index c0489256805e..7a90a6acef6a 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3627,7 +3627,10 @@ func testTransparentRetry(t *testing.T, e env) { }, { successAttempt: 2, failFast: true, - errCode: codes.Unavailable, // We won't retry on fail fast. + }, { + successAttempt: 3, + failFast: true, + errCode: codes.Unavailable, }} for _, tc := range testCases { attempts = 0 @@ -7133,9 +7136,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { } s2 := grpc.NewServer() defer s2.Stop() - conn2Ready := grpcsync.NewEvent() ts2 := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - conn2Ready.Fire() return &testpb.SimpleResponse{}, nil }} testpb.RegisterTestServiceServer(s2, ts2) @@ -7177,11 +7178,6 @@ func (s) TestGoAwayThenClose(t *testing.T) { t.Fatal("expected the stream to die, but got a successful Recv") } - // Connection was dialed, so ac is either in connecting or ready. Because there's a race - // between ac state change and balancer state change, so cc could still be transient - // failure. This wait make sure cc is at least in connecting, and RPCs won't fail after - // this. - cc.WaitForStateChange(ctx, connectivity.TransientFailure) // Do a bunch of RPCs, make sure it stays stable. These should go to connection 2. for i := 0; i < 10; i++ { if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { diff --git a/test/goaway_test.go b/test/goaway_test.go new file mode 100644 index 000000000000..69c6f33e66fd --- /dev/null +++ b/test/goaway_test.go @@ -0,0 +1,74 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test + +import ( + "context" + "net" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +// TestGracefulClientOnGoAway attempts to ensure that when the server sends a +// goaway, a client will never see an error. This requires that the client is +// appraised of the GOAWAY and updates its state accordingly before the +// transport stops accepting new streams. If a subconn is chosen by a picker +// and receives the goaway before creating the stream, an error will occur, but +// upon transparent retry, the clientconn will ensure a ready subconn is +// chosen. +func (s) TestGracefulClientOnGoAway(t *testing.T) { + const maxConnAge = 100 * time.Millisecond + const testTime = maxConnAge * 10 + + ss := &stubServer{ + emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + } + + s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge})) + defer s.Stop() + testpb.RegisterTestServiceServer(s, ss) + + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + go s.Serve(lis) + + cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatalf("Failed to dial server: %v", err) + } + defer cc.Close() + c := testpb.NewTestServiceClient(cc) + + endTime := time.Now().Add(testTime) + for time.Now().Before(endTime) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall(_, _) = _, %v; want _, ", err) + } + cancel() + } +} From 9e06ffcf4b3df958e08744daaefd35fae4ab1375 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Mon, 22 Jul 2019 15:34:47 -0700 Subject: [PATCH 2/2] "how do they work?" --- test/goaway_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/goaway_test.go b/test/goaway_test.go index 69c6f33e66fd..55f79ebc8548 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -30,12 +30,12 @@ import ( ) // TestGracefulClientOnGoAway attempts to ensure that when the server sends a -// goaway, a client will never see an error. This requires that the client is -// appraised of the GOAWAY and updates its state accordingly before the -// transport stops accepting new streams. If a subconn is chosen by a picker -// and receives the goaway before creating the stream, an error will occur, but -// upon transparent retry, the clientconn will ensure a ready subconn is -// chosen. +// GOAWAY (in this test, by configuring max connection age on the server), a +// client will never see an error. This requires that the client is appraised +// of the GOAWAY and updates its state accordingly before the transport stops +// accepting new streams. If a subconn is chosen by a picker and receives the +// goaway before creating the stream, an error will occur, but upon transparent +// retry, the clientconn will ensure a ready subconn is chosen. func (s) TestGracefulClientOnGoAway(t *testing.T) { const maxConnAge = 100 * time.Millisecond const testTime = maxConnAge * 10