Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: fix race between transport draining and new RPCs #2919

Merged
merged 2 commits into from
Jul 22, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,9 +1125,9 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
})

if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC),
numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
numCallsStarted: int64(countRPC)*2 - 1,
numCallsFinished: int64(countRPC)*2 - 1,
numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2,
numCallsFinishedKnownReceived: 1,
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1227,9 +1227,9 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
})

if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC),
numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
numCallsStarted: int64(countRPC)*2 - 1,
numCallsFinished: int64(countRPC)*2 - 1,
numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2,
numCallsFinishedKnownReceived: 1,
}); err != nil {
t.Fatal(err)
Expand Down
45 changes: 31 additions & 14 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,8 +1060,8 @@ func (ac *addrConn) resetTransport() {

ac.mu.Lock()
if ac.state == connectivity.Shutdown {
newTr.Close()
ac.mu.Unlock()
newTr.Close()
return
}
ac.curAddr = addr
Expand All @@ -1076,20 +1076,16 @@ func (ac *addrConn) resetTransport() {
// we restart from the top of the addr list.
<-reconnect.Done()
hcancel()

// Need to reconnect after a READY, the addrConn enters
// TRANSIENT_FAILURE.
// restart connecting - the top of the loop will set state to
// CONNECTING. This is against the current connectivity semantics doc,
// however it allows for graceful behavior for RPCs not yet dispatched
// - unfortunate timing would otherwise lead to the RPC failing even
// though the TRANSIENT_FAILURE state (called for by the doc) would be
// instantaneous.
//
// This will set addrConn to TRANSIENT_FAILURE for a very short period
// of time, and turns CONNECTING. It seems reasonable to skip this, but
// READY-CONNECTING is not a valid transition.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.TransientFailure)
ac.mu.Unlock()
// Ideally we should transition to Idle here and block until there is
// RPC activity that leads to the balancer requesting a reconnect of
// the associated SubConn.
}
}

Expand Down Expand Up @@ -1146,14 +1142,35 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
Authority: ac.cc.authority,
}

once := sync.Once{}
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
once.Do(func() {
if ac.state == connectivity.Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
//
// TODO: this should be Idle when grpc-go properly supports it.
ac.updateConnectivityState(connectivity.Connecting)
}
})
ac.mu.Unlock()
reconnect.Fire()
}

onClose := func() {
ac.mu.Lock()
once.Do(func() {
if ac.state == connectivity.Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
//
// TODO: this should be Idle when grpc-go properly supports it.
ac.updateConnectivityState(connectivity.Connecting)
}
})
ac.mu.Unlock()
close(onCloseCalled)
reconnect.Fire()
}
Expand Down
13 changes: 5 additions & 8 deletions clientconn_state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,11 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
}
}

// When a READY connection is closed, the client enters TRANSIENT FAILURE before CONNECTING.
func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
// When a READY connection is closed, the client enters CONNECTING.
func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.TransientFailure,
connectivity.Connecting,
}

Expand Down Expand Up @@ -260,8 +259,8 @@ func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
}
}

// When the first connection is closed, the client enters stays in CONNECTING
// until it tries the second address (which succeeds, and then it enters READY).
// When the first connection is closed, the client stays in CONNECTING until it
// tries the second address (which succeeds, and then it enters READY).
func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
Expand Down Expand Up @@ -354,13 +353,11 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
}

// When there are multiple addresses, and we enter READY on one of them, a
// later closure should cause the client to enter TRANSIENT FAILURE before it
// re-enters CONNECTING.
// later closure should cause the client to enter CONNECTING
func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.TransientFailure,
connectivity.Connecting,
}

Expand Down
12 changes: 7 additions & 5 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
close(s.headerChan)
}

}
hdr := &headerFrame{
hf: headerFields,
Expand Down Expand Up @@ -769,6 +768,9 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
return nil
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
t.onClose()
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
Expand All @@ -789,7 +791,6 @@ func (t *http2Client) Close() error {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
t.onClose()
return err
}

Expand Down Expand Up @@ -1085,11 +1086,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
default:
t.setGoAwayReason(f)
close(t.goAway)
t.state = draining
t.controlBuf.put(&incomingGoAway{})

// This has to be a new goroutine because we're still using the current goroutine to read in the transport.
// Notify the clientconn about the GOAWAY before we set the state to
// draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
t.state = draining
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
Expand Down
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ func (cs *clientStream) shouldRetry(err error) error {
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
}
if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
// First attempt, wait-for-ready, stream unprocessed: transparently retry.
if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
// First attempt, stream unprocessed: transparently retry.
cs.firstAttempt = false
return nil
}
Expand Down
12 changes: 4 additions & 8 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3627,7 +3627,10 @@ func testTransparentRetry(t *testing.T, e env) {
}, {
successAttempt: 2,
failFast: true,
errCode: codes.Unavailable, // We won't retry on fail fast.
}, {
successAttempt: 3,
failFast: true,
errCode: codes.Unavailable,
}}
for _, tc := range testCases {
attempts = 0
Expand Down Expand Up @@ -7133,9 +7136,7 @@ func (s) TestGoAwayThenClose(t *testing.T) {
}
s2 := grpc.NewServer()
defer s2.Stop()
conn2Ready := grpcsync.NewEvent()
ts2 := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
conn2Ready.Fire()
return &testpb.SimpleResponse{}, nil
}}
testpb.RegisterTestServiceServer(s2, ts2)
Expand Down Expand Up @@ -7177,11 +7178,6 @@ func (s) TestGoAwayThenClose(t *testing.T) {
t.Fatal("expected the stream to die, but got a successful Recv")
}

// Connection was dialed, so ac is either in connecting or ready. Because there's a race
// between ac state change and balancer state change, so cc could still be transient
// failure. This wait make sure cc is at least in connecting, and RPCs won't fail after
// this.
cc.WaitForStateChange(ctx, connectivity.TransientFailure)
// Do a bunch of RPCs, make sure it stays stable. These should go to connection 2.
for i := 0; i < 10; i++ {
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
Expand Down
74 changes: 74 additions & 0 deletions test/goaway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package test

import (
"context"
"net"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
testpb "google.golang.org/grpc/test/grpc_testing"
)

// TestGracefulClientOnGoAway attempts to ensure that when the server sends a
// goaway, a client will never see an error. This requires that the client is
// appraised of the GOAWAY and updates its state accordingly before the
// transport stops accepting new streams. If a subconn is chosen by a picker
// and receives the goaway before creating the stream, an error will occur, but
// upon transparent retry, the clientconn will ensure a ready subconn is
// chosen.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
func (s) TestGracefulClientOnGoAway(t *testing.T) {
const maxConnAge = 100 * time.Millisecond
const testTime = maxConnAge * 10

ss := &stubServer{
emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}

s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge}))
defer s.Stop()
testpb.RegisterTestServiceServer(s, ss)

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
go s.Serve(lis)

cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial server: %v", err)
}
defer cc.Close()
c := testpb.NewTestServiceClient(cc)

endTime := time.Now().Add(testTime)
for time.Now().Before(endTime) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall(_, _) = _, %v; want _, <nil>", err)
}
cancel()
}
}