From 6c316d67e2292325f8d32e42a4e7ef68a2de0cb5 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Fri, 28 Jun 2024 13:15:27 +0530 Subject: [PATCH 01/19] fix: add timeout for loopy writer wait to close itself --- internal/transport/http2_client.go | 8 +- internal/transport/transport_test.go | 145 ++++++++++++++++++++++++++- 2 files changed, 150 insertions(+), 3 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 3c63c706986d..f63fae5b8b5a 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1009,7 +1009,12 @@ func (t *http2Client) Close(err error) { // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err}) - <-t.writerDone + timer := time.NewTimer(5 * time.Second) + select { + case <-t.writerDone: + case <-timer.C: + t.logger.Warningf("timeout waiting for the loopy writer to be closed.") + } t.cancel() t.conn.Close() channelz.RemoveEntry(t.channelz.ID) @@ -1035,6 +1040,7 @@ func (t *http2Client) Close(err error) { } sh.HandleConn(t.ctx, connEnd) } + t.logger.Infof("Closed the client connection") } // GracefulClose sets the state to draining, which prevents new streams from diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 7887c8be8647..23b2fbd284c7 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -90,6 +90,7 @@ const ( invalidHeaderField delayRead pingpong + goAwayFrameSize = 42 ) func (h *testStreamHandler) handleStreamAndNotify(s *Stream) { @@ -2656,7 +2657,7 @@ func TestConnectionError_Unwrap(t *testing.T) { } } -// Test that in the event of a graceful client transport shutdown, i.e., +// TestClientSendsAGoAwayFrame verifies that in the event of a graceful client transport shutdown, i.e., // clientTransport.Close(), client sends a goaway to the server with the correct // error code and debug data. func (s) TestClientSendsAGoAwayFrame(t *testing.T) { @@ -2736,7 +2737,6 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { // Wait until server receives the headers and settings frame as part of greet. <-greetDone ct.Close(errors.New("manually closed by client")) - t.Logf("Closed the client connection") select { case err := <-errorCh: if err != nil { @@ -2746,3 +2746,144 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { t.Errorf("Context timed out") } } + +var writeHangSignal chan struct{} + +type hangingConn struct { + net.Conn +} + +func (hc *hangingConn) Read(b []byte) (n int, err error) { + n, err = hc.Conn.Read(b) + return n, err +} + +func (hc *hangingConn) Write(b []byte) (n int, err error) { + n, err = hc.Conn.Write(b) + if n == goAwayFrameSize { // GOAWAY frame + writeHangSignal = make(chan struct{}) + time.Sleep(15 * time.Second) + } + return n, err +} + +func (hc *hangingConn) Close() error { + return hc.Conn.Close() +} + +func (hc *hangingConn) LocalAddr() net.Addr { + return hc.Conn.LocalAddr() +} + +func (hc *hangingConn) RemoteAddr() net.Addr { + return hc.Conn.RemoteAddr() +} + +func (hc *hangingConn) SetDeadline(t time.Time) error { + return hc.Conn.SetDeadline(t) +} + +func (hc *hangingConn) SetReadDeadline(t time.Time) error { + return hc.Conn.SetReadDeadline(t) +} + +func (hc *hangingConn) SetWriteDeadline(t time.Time) error { + return hc.Conn.SetWriteDeadline(t) +} + +func hangingDialer(_ context.Context, addr string) (net.Conn, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + return &hangingConn{Conn: conn}, nil +} + +// TestClientCloseTimeoutOnHang verifies that in the event of a graceful +// client transport shutdown, i.e., clientTransport.Close(), if the conn hung +// forever, client should still be close itself and do not wait for long. +func (s) TestClientCloseTimeoutOnHang(t *testing.T) { + // Create a server. + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening: %v", err) + } + defer lis.Close() + // greetDone is used to notify when server is done greeting the client. + greetDone := make(chan struct{}) + // errorCh verifies that desired GOAWAY not received by server + errorCh := make(chan error) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Launch the server. + go func() { + sconn, err := lis.Accept() + if err != nil { + t.Errorf("Error while accepting: %v", err) + } + defer sconn.Close() + if _, err := io.ReadFull(sconn, make([]byte, len(clientPreface))); err != nil { + t.Errorf("Error while writing settings ack: %v", err) + return + } + sfr := http2.NewFramer(sconn, sconn) + if err := sfr.WriteSettings(); err != nil { + t.Errorf("Error while writing settings %v", err) + return + } + fr, _ := sfr.ReadFrame() + if _, ok := fr.(*http2.SettingsFrame); !ok { + t.Errorf("Expected settings frame, got %v", fr) + } + fr, _ = sfr.ReadFrame() + if fr, ok := fr.(*http2.SettingsFrame); !ok || !fr.IsAck() { + t.Errorf("Expected settings ACK frame, got %v", fr) + } + fr, _ = sfr.ReadFrame() + if fr, ok := fr.(*http2.HeadersFrame); !ok || !fr.Flags.Has(http2.FlagHeadersEndHeaders) { + t.Errorf("Expected Headers frame with END_HEADERS frame, got %v", fr) + } + close(greetDone) + + frame, err := sfr.ReadFrame() + if err != nil { + return + } + switch fr := frame.(type) { + case *http2.GoAwayFrame: + // Records that the server successfully received a GOAWAY frame. + goAwayFrame := fr + if goAwayFrame.ErrCode == http2.ErrCodeNo { + t.Logf("Received goAway frame from client") + close(errorCh) + } else { + errorCh <- fmt.Errorf("received unexpected goAway frame: %v", err) + close(errorCh) + } + return + default: + errorCh <- fmt.Errorf("server received a frame other than GOAWAY: %v", err) + close(errorCh) + return + } + }() + + ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{Dialer: hangingDialer}, func(GoAwayReason) {}) + if err != nil { + t.Fatalf("Error while creating client transport: %v", err) + } + _, err = ct.NewStream(ctx, &CallHdr{}) + if err != nil { + t.Fatalf("failed to open stream: %v", err) + } + // Wait until server receives the headers and settings frame as part of greet. + <-greetDone + ct.Close(errors.New("manually closed by client")) + defer close(writeHangSignal) + select { + case <-writeHangSignal: + t.Errorf("error: channel closed too early.") + case <-ctx.Done(): + } + +} From 284f45bdd41ca328ebb5d1c0b8be455e49312516 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Tue, 2 Jul 2024 10:54:19 +0530 Subject: [PATCH 02/19] refactor: resolved suggestions from purnesh --- internal/transport/http2_client.go | 30 ++++--- internal/transport/transport_test.go | 126 +++++++-------------------- 2 files changed, 49 insertions(+), 107 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index f63fae5b8b5a..68e18c9cf623 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -59,6 +59,8 @@ import ( // atomically. var clientConnectionCounter uint64 +var GoAwayLoopyWriterTimeout = time.Second + var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool)) // http2Client implements the ClientTransport interface with HTTP2. @@ -1006,29 +1008,29 @@ func (t *http2Client) Close(err error) { t.kpDormancyCond.Signal() } t.mu.Unlock() - // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the - // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. + var st *status.Status + // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the connection. + // See https://httpwg.org/specs/rfc7540.html#GOAWAY. It also waits for loopyWriter to + // be closed with a timer to avoid the indefinite blocking. t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err}) - timer := time.NewTimer(5 * time.Second) + timer := time.NewTimer(GoAwayLoopyWriterTimeout) select { case <-t.writerDone: + // Append info about previous goaway's if there were any, since this may be important + // for understanding the root cause for this connection to be closed. + _, goAwayDebugMessage := t.GetGoAwayReason() + if len(goAwayDebugMessage) > 0 { + st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage) + err = st.Err() + } else { + st = status.New(codes.Unavailable, err.Error()) + } case <-timer.C: t.logger.Warningf("timeout waiting for the loopy writer to be closed.") } t.cancel() t.conn.Close() channelz.RemoveEntry(t.channelz.ID) - // Append info about previous goaways if there were any, since this may be important - // for understanding the root cause for this connection to be closed. - _, goAwayDebugMessage := t.GetGoAwayReason() - - var st *status.Status - if len(goAwayDebugMessage) > 0 { - st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage) - err = st.Err() - } else { - st = status.New(codes.Unavailable, err.Error()) - } // Notify all active streams. for _, s := range streams { diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 23b2fbd284c7..f0458abd4cc5 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -90,9 +90,10 @@ const ( invalidHeaderField delayRead pingpong - goAwayFrameSize = 42 ) +const goAwayFrameSize = 42 + func (h *testStreamHandler) handleStreamAndNotify(s *Stream) { if h.notify == nil { return @@ -2661,94 +2662,15 @@ func TestConnectionError_Unwrap(t *testing.T) { // clientTransport.Close(), client sends a goaway to the server with the correct // error code and debug data. func (s) TestClientSendsAGoAwayFrame(t *testing.T) { - // Create a server. - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening: %v", err) - } - defer lis.Close() - // greetDone is used to notify when server is done greeting the client. - greetDone := make(chan struct{}) - // errorCh verifies that desired GOAWAY not received by server - errorCh := make(chan error) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // Launch the server. - go func() { - sconn, err := lis.Accept() - if err != nil { - t.Errorf("Error while accepting: %v", err) - } - defer sconn.Close() - if _, err := io.ReadFull(sconn, make([]byte, len(clientPreface))); err != nil { - t.Errorf("Error while writing settings ack: %v", err) - return - } - sfr := http2.NewFramer(sconn, sconn) - if err := sfr.WriteSettings(); err != nil { - t.Errorf("Error while writing settings %v", err) - return - } - fr, _ := sfr.ReadFrame() - if _, ok := fr.(*http2.SettingsFrame); !ok { - t.Errorf("Expected settings frame, got %v", fr) - } - fr, _ = sfr.ReadFrame() - if fr, ok := fr.(*http2.SettingsFrame); !ok || !fr.IsAck() { - t.Errorf("Expected settings ACK frame, got %v", fr) - } - fr, _ = sfr.ReadFrame() - if fr, ok := fr.(*http2.HeadersFrame); !ok || !fr.Flags.Has(http2.FlagHeadersEndHeaders) { - t.Errorf("Expected Headers frame with END_HEADERS frame, got %v", fr) - } - close(greetDone) - - frame, err := sfr.ReadFrame() - if err != nil { - return - } - switch fr := frame.(type) { - case *http2.GoAwayFrame: - // Records that the server successfully received a GOAWAY frame. - goAwayFrame := fr - if goAwayFrame.ErrCode == http2.ErrCodeNo { - t.Logf("Received goAway frame from client") - close(errorCh) - } else { - errorCh <- fmt.Errorf("received unexpected goAway frame: %v", err) - close(errorCh) - } - return - default: - errorCh <- fmt.Errorf("server received a frame other than GOAWAY: %v", err) - close(errorCh) - return - } - }() - - ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {}) - if err != nil { - t.Fatalf("Error while creating client transport: %v", err) - } - _, err = ct.NewStream(ctx, &CallHdr{}) - if err != nil { - t.Fatalf("failed to open stream: %v", err) - } - // Wait until server receives the headers and settings frame as part of greet. - <-greetDone - ct.Close(errors.New("manually closed by client")) - select { - case err := <-errorCh: - if err != nil { - t.Errorf("Error receiving the GOAWAY frame: %v", err) - } - case <-ctx.Done(): - t.Errorf("Context timed out") - } + createClientServerConn(t, ConnectOptions{}) } +// writeHangSignal is used to hang the net.Conn Write for complete test duration. var writeHangSignal chan struct{} +// hangingConn is a net.Conn wrapper for testing, simulating hanging connections +// after a GOAWAY frame is sent, of which Write operations pause until explicitly signaled +// or a timeout occurs. type hangingConn struct { net.Conn } @@ -2761,8 +2683,7 @@ func (hc *hangingConn) Read(b []byte) (n int, err error) { func (hc *hangingConn) Write(b []byte) (n int, err error) { n, err = hc.Conn.Write(b) if n == goAwayFrameSize { // GOAWAY frame - writeHangSignal = make(chan struct{}) - time.Sleep(15 * time.Second) + <-writeHangSignal } return n, err } @@ -2801,8 +2722,25 @@ func hangingDialer(_ context.Context, addr string) (net.Conn, error) { // TestClientCloseTimeoutOnHang verifies that in the event of a graceful // client transport shutdown, i.e., clientTransport.Close(), if the conn hung -// forever, client should still be close itself and do not wait for long. +// for LoopyWriterTimeout, client should still be close itself and should +// not wait for long. func (s) TestClientCloseTimeoutOnHang(t *testing.T) { + origGoAwayLoopyTimeout := GoAwayLoopyWriterTimeout + GoAwayLoopyWriterTimeout = 0 + defer func() { + GoAwayLoopyWriterTimeout = origGoAwayLoopyTimeout + }() + writeHangSignal = make(chan struct{}) + ctx, _, _ := createClientServerConn(t, ConnectOptions{Dialer: hangingDialer}) + defer close(writeHangSignal) + select { + case <-writeHangSignal: + t.Errorf("error: channel closed too early.") + case <-ctx.Done(): + } +} + +func createClientServerConn(t *testing.T, connectOptions ConnectOptions) (context.Context, chan error, ClientTransport) { // Create a server. lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -2868,7 +2806,7 @@ func (s) TestClientCloseTimeoutOnHang(t *testing.T) { } }() - ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{Dialer: hangingDialer}, func(GoAwayReason) {}) + ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, connectOptions, func(GoAwayReason) {}) if err != nil { t.Fatalf("Error while creating client transport: %v", err) } @@ -2879,11 +2817,13 @@ func (s) TestClientCloseTimeoutOnHang(t *testing.T) { // Wait until server receives the headers and settings frame as part of greet. <-greetDone ct.Close(errors.New("manually closed by client")) - defer close(writeHangSignal) select { - case <-writeHangSignal: - t.Errorf("error: channel closed too early.") + case err := <-errorCh: + if err != nil { + t.Errorf("Error receiving the GOAWAY frame: %v", err) + } case <-ctx.Done(): + t.Errorf("Context timed out") } - + return ctx, errorCh, ct } From 44d097909b05b5d7e9909ce5108b1878ce29eb0d Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Fri, 5 Jul 2024 15:32:14 +0530 Subject: [PATCH 03/19] remove hangingConn logic --- internal/transport/transport_test.go | 85 ++++------------------------ 1 file changed, 11 insertions(+), 74 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index f0458abd4cc5..bcd0c37f88c0 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -92,8 +92,6 @@ const ( pingpong ) -const goAwayFrameSize = 42 - func (h *testStreamHandler) handleStreamAndNotify(s *Stream) { if h.notify == nil { return @@ -2662,62 +2660,11 @@ func TestConnectionError_Unwrap(t *testing.T) { // clientTransport.Close(), client sends a goaway to the server with the correct // error code and debug data. func (s) TestClientSendsAGoAwayFrame(t *testing.T) { - createClientServerConn(t, ConnectOptions{}) -} - -// writeHangSignal is used to hang the net.Conn Write for complete test duration. -var writeHangSignal chan struct{} - -// hangingConn is a net.Conn wrapper for testing, simulating hanging connections -// after a GOAWAY frame is sent, of which Write operations pause until explicitly signaled -// or a timeout occurs. -type hangingConn struct { - net.Conn -} - -func (hc *hangingConn) Read(b []byte) (n int, err error) { - n, err = hc.Conn.Read(b) - return n, err -} - -func (hc *hangingConn) Write(b []byte) (n int, err error) { - n, err = hc.Conn.Write(b) - if n == goAwayFrameSize { // GOAWAY frame - <-writeHangSignal - } - return n, err -} - -func (hc *hangingConn) Close() error { - return hc.Conn.Close() -} - -func (hc *hangingConn) LocalAddr() net.Addr { - return hc.Conn.LocalAddr() -} - -func (hc *hangingConn) RemoteAddr() net.Addr { - return hc.Conn.RemoteAddr() -} - -func (hc *hangingConn) SetDeadline(t time.Time) error { - return hc.Conn.SetDeadline(t) -} - -func (hc *hangingConn) SetReadDeadline(t time.Time) error { - return hc.Conn.SetReadDeadline(t) -} - -func (hc *hangingConn) SetWriteDeadline(t time.Time) error { - return hc.Conn.SetWriteDeadline(t) -} - -func hangingDialer(_ context.Context, addr string) (net.Conn, error) { - conn, err := net.Dial("tcp", addr) + errorCh := createClientServerLoggingGoAway(t) + err := <-errorCh if err != nil { - return nil, err + t.Errorf("Error receiving the GOAWAY frame: %v", err) } - return &hangingConn{Conn: conn}, nil } // TestClientCloseTimeoutOnHang verifies that in the event of a graceful @@ -2730,17 +2677,12 @@ func (s) TestClientCloseTimeoutOnHang(t *testing.T) { defer func() { GoAwayLoopyWriterTimeout = origGoAwayLoopyTimeout }() - writeHangSignal = make(chan struct{}) - ctx, _, _ := createClientServerConn(t, ConnectOptions{Dialer: hangingDialer}) - defer close(writeHangSignal) - select { - case <-writeHangSignal: - t.Errorf("error: channel closed too early.") - case <-ctx.Done(): - } + createClientServerLoggingGoAway(t) } -func createClientServerConn(t *testing.T, connectOptions ConnectOptions) (context.Context, chan error, ClientTransport) { +// createClientServerLoggingGoAway sets up a server(that expects a GOAWAY frame +// from the client.), and creates a ClientTransport . +func createClientServerLoggingGoAway(t *testing.T) chan error { // Create a server. lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -2755,6 +2697,7 @@ func createClientServerConn(t *testing.T, connectOptions ConnectOptions) (contex defer cancel() // Launch the server. go func() { + defer close(errorCh) sconn, err := lis.Accept() if err != nil { t.Errorf("Error while accepting: %v", err) @@ -2793,20 +2736,17 @@ func createClientServerConn(t *testing.T, connectOptions ConnectOptions) (contex goAwayFrame := fr if goAwayFrame.ErrCode == http2.ErrCodeNo { t.Logf("Received goAway frame from client") - close(errorCh) } else { errorCh <- fmt.Errorf("received unexpected goAway frame: %v", err) - close(errorCh) } return default: errorCh <- fmt.Errorf("server received a frame other than GOAWAY: %v", err) - close(errorCh) return } }() - ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, connectOptions, func(GoAwayReason) {}) + ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {}) if err != nil { t.Fatalf("Error while creating client transport: %v", err) } @@ -2818,12 +2758,9 @@ func createClientServerConn(t *testing.T, connectOptions ConnectOptions) (contex <-greetDone ct.Close(errors.New("manually closed by client")) select { - case err := <-errorCh: - if err != nil { - t.Errorf("Error receiving the GOAWAY frame: %v", err) - } + case <-errorCh: case <-ctx.Done(): t.Errorf("Context timed out") } - return ctx, errorCh, ct + return errorCh } From 73da5716a4c1ea3bcef254353319e1b5c025c4a0 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Mon, 8 Jul 2024 16:01:58 +0530 Subject: [PATCH 04/19] Undo refactor --- internal/transport/http2_client.go | 22 ++--- internal/transport/transport_test.go | 137 +++++++++++++++++++++------ 2 files changed, 119 insertions(+), 40 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 68e18c9cf623..9da4a1e38d2e 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -59,7 +59,7 @@ import ( // atomically. var clientConnectionCounter uint64 -var GoAwayLoopyWriterTimeout = time.Second +var goAwayLoopyWriterTimeout = 5 * time.Second var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool)) @@ -982,7 +982,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. } // Close kicks off the shutdown process of the transport. This should be called -// only once on a transport. Once it is called, the transport should not be +// only once on transport. Once it is called, the transport should not be // accessed anymore. func (t *http2Client) Close(err error) { t.mu.Lock() @@ -1009,15 +1009,16 @@ func (t *http2Client) Close(err error) { } t.mu.Unlock() var st *status.Status - // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the connection. - // See https://httpwg.org/specs/rfc7540.html#GOAWAY. It also waits for loopyWriter to - // be closed with a timer to avoid the indefinite blocking. + // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the + // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It + // also waits for loopyWriter to be closed with a timer to avoid the + // long blocking in case the connection is half closed. t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err}) - timer := time.NewTimer(GoAwayLoopyWriterTimeout) select { case <-t.writerDone: - // Append info about previous goaway's if there were any, since this may be important - // for understanding the root cause for this connection to be closed. + // Append info about previous goaway's if there were any, since this + // may be important for understanding the root cause for this + // connection to be closed. _, goAwayDebugMessage := t.GetGoAwayReason() if len(goAwayDebugMessage) > 0 { st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage) @@ -1025,8 +1026,8 @@ func (t *http2Client) Close(err error) { } else { st = status.New(codes.Unavailable, err.Error()) } - case <-timer.C: - t.logger.Warningf("timeout waiting for the loopy writer to be closed.") + case <-time.After(goAwayLoopyWriterTimeout): + t.logger.Warningf("Failed to write a GOAWAY frame as part of connection close after %v. Giving up and closing the transport.", goAwayLoopyWriterTimeout) } t.cancel() t.conn.Close() @@ -1042,7 +1043,6 @@ func (t *http2Client) Close(err error) { } sh.HandleConn(t.ctx, connEnd) } - t.logger.Infof("Closed the client connection") } // GracefulClose sets the state to draining, which prevents new streams from diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index bcd0c37f88c0..b0367395da1b 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2424,7 +2424,7 @@ func (s) TestClientHandshakeInfo(t *testing.T) { TransportCredentials: creds, ChannelzParent: channelzSubChannel(t), } - tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {}) + tr, err := NewClientTransport(ctx, ctx, addr, copts, func(GoAwayReason) {}) if err != nil { t.Fatalf("NewClientTransport(): %v", err) } @@ -2465,7 +2465,7 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) { Dialer: dialer, ChannelzParent: channelzSubChannel(t), } - tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {}) + tr, err := NewClientTransport(ctx, ctx, addr, copts, func(GoAwayReason) {}) if err != nil { t.Fatalf("NewClientTransport(): %v", err) } @@ -2656,33 +2656,10 @@ func TestConnectionError_Unwrap(t *testing.T) { } } -// TestClientSendsAGoAwayFrame verifies that in the event of a graceful client transport shutdown, i.e., +// Test that in the event of a graceful client transport shutdown, i.e., // clientTransport.Close(), client sends a goaway to the server with the correct // error code and debug data. func (s) TestClientSendsAGoAwayFrame(t *testing.T) { - errorCh := createClientServerLoggingGoAway(t) - err := <-errorCh - if err != nil { - t.Errorf("Error receiving the GOAWAY frame: %v", err) - } -} - -// TestClientCloseTimeoutOnHang verifies that in the event of a graceful -// client transport shutdown, i.e., clientTransport.Close(), if the conn hung -// for LoopyWriterTimeout, client should still be close itself and should -// not wait for long. -func (s) TestClientCloseTimeoutOnHang(t *testing.T) { - origGoAwayLoopyTimeout := GoAwayLoopyWriterTimeout - GoAwayLoopyWriterTimeout = 0 - defer func() { - GoAwayLoopyWriterTimeout = origGoAwayLoopyTimeout - }() - createClientServerLoggingGoAway(t) -} - -// createClientServerLoggingGoAway sets up a server(that expects a GOAWAY frame -// from the client.), and creates a ClientTransport . -func createClientServerLoggingGoAway(t *testing.T) chan error { // Create a server. lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -2697,7 +2674,6 @@ func createClientServerLoggingGoAway(t *testing.T) chan error { defer cancel() // Launch the server. go func() { - defer close(errorCh) sconn, err := lis.Accept() if err != nil { t.Errorf("Error while accepting: %v", err) @@ -2736,17 +2712,20 @@ func createClientServerLoggingGoAway(t *testing.T) chan error { goAwayFrame := fr if goAwayFrame.ErrCode == http2.ErrCodeNo { t.Logf("Received goAway frame from client") + close(errorCh) } else { errorCh <- fmt.Errorf("received unexpected goAway frame: %v", err) + close(errorCh) } return default: errorCh <- fmt.Errorf("server received a frame other than GOAWAY: %v", err) + close(errorCh) return } }() - ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {}) + ct, err := NewClientTransport(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {}) if err != nil { t.Fatalf("Error while creating client transport: %v", err) } @@ -2757,10 +2736,110 @@ func createClientServerLoggingGoAway(t *testing.T) chan error { // Wait until server receives the headers and settings frame as part of greet. <-greetDone ct.Close(errors.New("manually closed by client")) + t.Logf("Closed the client connection") + select { + case err := <-errorCh: + if err != nil { + t.Errorf("Error receiving the GOAWAY frame: %v", err) + } + case <-ctx.Done(): + t.Errorf("Context timed out") + } +} + +// Test that in the event of a graceful client transport shutdown +// , i.e., clientTransport.Close(), if the conn hung for +// LoopyWriterTimeout, client should still be close itself and should +// not wait for long. +func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { + // Override goAwayLoopyWriterTimeout to 0 so that we always + // time out while writing GOAWAY on client.Close(). This is + // equivalent to network hang scenario when client is + // failing to write GOAWAY frame. + origGoAwayLoopyTimeout := goAwayLoopyWriterTimeout + goAwayLoopyWriterTimeout = 0 + defer func() { + goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout + }() + + // Create a server. + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening: %v", err) + } + defer lis.Close() + // serverGreetingDone is used to notify when server is done greeting the client. + serverGreetingDone := make(chan struct{}) + // errorCh verifies that desired GOAWAY not received by server + errorCh := make(chan error, 1) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Launch the server. + go func() { + defer close(errorCh) + conn, err := lis.Accept() + if err != nil { + t.Errorf("Error while accepting: %v", err) + } + defer conn.Close() + if _, err := io.ReadFull(conn, make([]byte, len(clientPreface))); err != nil { + t.Errorf("Error while reading client preface: %v", err) + return + } + framer := http2.NewFramer(conn, conn) + if err := framer.WriteSettings(); err != nil { + t.Errorf("Error while writing settings %v", err) + return + } + fr, _ := framer.ReadFrame() + if _, ok := fr.(*http2.SettingsFrame); !ok { + t.Errorf("Expected settings frame, got %T", fr) + } + fr, _ = framer.ReadFrame() + if fr, ok := fr.(*http2.SettingsFrame); !ok || !fr.IsAck() { + t.Errorf("Expected settings ACK frame, got %T", fr) + } + fr, _ = framer.ReadFrame() + if fr, ok := fr.(*http2.HeadersFrame); !ok || !fr.Flags.Has(http2.FlagHeadersEndHeaders) { + t.Errorf("Expected Headers frame with END_HEADERS frame, got %T", fr) + } + close(serverGreetingDone) + + frame, err := framer.ReadFrame() + if err != nil { + return + } + switch fr := frame.(type) { + case *http2.GoAwayFrame: + if fr.ErrCode == http2.ErrCodeNo { + t.Logf("Received goAway frame from client") + return + } + errorCh <- fmt.Errorf("received unexpected goAway frame: %v", err) + default: + errorCh <- fmt.Errorf("server received a frame other than GOAWAY: %v", err) + return + } + }() + + ct, err := NewClientTransport(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {}) + if err != nil { + t.Fatalf("Error while creating client transport: %v", err) + } + _, err = ct.NewStream(ctx, &CallHdr{}) + if err != nil { + t.Fatalf("failed to open stream: %v", err) + } + // Wait until server receives the headers and settings frame as part of greet. + <-serverGreetingDone + // ct.Close will try to send the GOAWAY to server and will fail writing + // GOAWAY and will eventually close the loopyWriter as + // goAwayLoopyWriterTimeout (time out for writing GOAWAY) is set to zero, which is + // equivalent to network hang scenario. + ct.Close(errors.New("manually closed by client")) select { case <-errorCh: case <-ctx.Done(): t.Errorf("Context timed out") } - return errorCh } From b083d8c452f2b07066d74e803a597d20e8f9cd73 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Mon, 15 Jul 2024 14:53:01 +0530 Subject: [PATCH 05/19] resolved easwar's comments: refactored server code in test and comments --- internal/transport/http2_client.go | 5 +-- internal/transport/transport_test.go | 48 ++++++++-------------------- 2 files changed, 17 insertions(+), 36 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 9da4a1e38d2e..0a33943929d8 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -982,7 +982,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. } // Close kicks off the shutdown process of the transport. This should be called -// only once on transport. Once it is called, the transport should not be +// only once on a transport. Once it is called, the transport should not be // accessed anymore. func (t *http2Client) Close(err error) { t.mu.Lock() @@ -1027,7 +1027,8 @@ func (t *http2Client) Close(err error) { st = status.New(codes.Unavailable, err.Error()) } case <-time.After(goAwayLoopyWriterTimeout): - t.logger.Warningf("Failed to write a GOAWAY frame as part of connection close after %v. Giving up and closing the transport.", goAwayLoopyWriterTimeout) + st = status.New(codes.Unavailable, err.Error()) + t.logger.Warningf("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout) } t.cancel() t.conn.Close() diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index b0367395da1b..aea77477ddfa 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2748,14 +2748,13 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { } // Test that in the event of a graceful client transport shutdown -// , i.e., clientTransport.Close(), if the conn hung for -// LoopyWriterTimeout, client should still be close itself and should -// not wait for long. +// , i.e., clientTransport.Close(), if the GOAWAY write is not +// finished within specified time due to network hang, client +// should still close without waiting for too long. func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { - // Override goAwayLoopyWriterTimeout to 0 so that we always - // time out while writing GOAWAY on client.Close(). This is - // equivalent to network hang scenario when client is - // failing to write GOAWAY frame. + // Override timer for writing GOAWAY to 0 so that the connection write + // always times out. It is equivalent of real network hang when conn + // write for goaway doesn't finish in specified deadline origGoAwayLoopyTimeout := goAwayLoopyWriterTimeout goAwayLoopyWriterTimeout = 0 defer func() { @@ -2768,13 +2767,13 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { t.Fatalf("Error while listening: %v", err) } defer lis.Close() - // serverGreetingDone is used to notify when server is done greeting the client. - serverGreetingDone := make(chan struct{}) // errorCh verifies that desired GOAWAY not received by server errorCh := make(chan error, 1) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - // Launch the server. + // Launch the server and handle HTTP/2 connections, specifically + // focusing on the reception and interpretation of GOAWAY frames + // sent from client. go func() { defer close(errorCh) conn, err := lis.Accept() @@ -2782,28 +2781,11 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { t.Errorf("Error while accepting: %v", err) } defer conn.Close() - if _, err := io.ReadFull(conn, make([]byte, len(clientPreface))); err != nil { - t.Errorf("Error while reading client preface: %v", err) - return - } framer := http2.NewFramer(conn, conn) if err := framer.WriteSettings(); err != nil { t.Errorf("Error while writing settings %v", err) return } - fr, _ := framer.ReadFrame() - if _, ok := fr.(*http2.SettingsFrame); !ok { - t.Errorf("Expected settings frame, got %T", fr) - } - fr, _ = framer.ReadFrame() - if fr, ok := fr.(*http2.SettingsFrame); !ok || !fr.IsAck() { - t.Errorf("Expected settings ACK frame, got %T", fr) - } - fr, _ = framer.ReadFrame() - if fr, ok := fr.(*http2.HeadersFrame); !ok || !fr.Flags.Has(http2.FlagHeadersEndHeaders) { - t.Errorf("Expected Headers frame with END_HEADERS frame, got %T", fr) - } - close(serverGreetingDone) frame, err := framer.ReadFrame() if err != nil { @@ -2830,16 +2812,14 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { if err != nil { t.Fatalf("failed to open stream: %v", err) } - // Wait until server receives the headers and settings frame as part of greet. - <-serverGreetingDone - // ct.Close will try to send the GOAWAY to server and will fail writing - // GOAWAY and will eventually close the loopyWriter as - // goAwayLoopyWriterTimeout (time out for writing GOAWAY) is set to zero, which is - // equivalent to network hang scenario. + // ct.Close will try to send the GOAWAY to server but conn. + // Write will time out due to goAwayLoopyWriterTimeout being + // 0. It is equivalent of a network hang when GOAWAY doesn't + // finish within specified deadline. ct.Close(errors.New("manually closed by client")) select { case <-errorCh: case <-ctx.Done(): - t.Errorf("Context timed out") + t.Errorf("timeout waiting for client Close(): Context timed out") } } From 2525c9bf54bf3baad46336d9ce8519f4ea8bdaff Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Thu, 18 Jul 2024 11:32:06 +0530 Subject: [PATCH 06/19] fixed easwars comment: put ct.Close in goroutine --- internal/transport/transport_test.go | 45 +++++++++++----------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index aea77477ddfa..ae1cbda411ff 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2767,15 +2767,9 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { t.Fatalf("Error while listening: %v", err) } defer lis.Close() - // errorCh verifies that desired GOAWAY not received by server - errorCh := make(chan error, 1) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // Launch the server and handle HTTP/2 connections, specifically - // focusing on the reception and interpretation of GOAWAY frames - // sent from client. + + // Launch the server and allow HTTP/2 connections go func() { - defer close(errorCh) conn, err := lis.Accept() if err != nil { t.Errorf("Error while accepting: %v", err) @@ -2786,24 +2780,14 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { t.Errorf("Error while writing settings %v", err) return } - - frame, err := framer.ReadFrame() + _, err = framer.ReadFrame() if err != nil { return } - switch fr := frame.(type) { - case *http2.GoAwayFrame: - if fr.ErrCode == http2.ErrCodeNo { - t.Logf("Received goAway frame from client") - return - } - errorCh <- fmt.Errorf("received unexpected goAway frame: %v", err) - default: - errorCh <- fmt.Errorf("server received a frame other than GOAWAY: %v", err) - return - } }() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() ct, err := NewClientTransport(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {}) if err != nil { t.Fatalf("Error while creating client transport: %v", err) @@ -2812,13 +2796,20 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { if err != nil { t.Fatalf("failed to open stream: %v", err) } - // ct.Close will try to send the GOAWAY to server but conn. - // Write will time out due to goAwayLoopyWriterTimeout being - // 0. It is equivalent of a network hang when GOAWAY doesn't - // finish within specified deadline. - ct.Close(errors.New("manually closed by client")) + + // ctClosed verifies that clientTransport is closed successfully + ctClosed := make(chan error, 1) + go func() { + // ct.Close will try to send the GOAWAY to server but conn. + // Write will time out due to goAwayLoopyWriterTimeout being + // 0. It is equivalent of a network hang when GOAWAY doesn't + // finish within specified deadline. + ct.Close(errors.New("manually closed by client")) + close(ctClosed) + }() + select { - case <-errorCh: + case <-ctClosed: case <-ctx.Done(): t.Errorf("timeout waiting for client Close(): Context timed out") } From cd7b716693e27c9ee0c01b8d4b2a58282ccc5ade Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Thu, 25 Jul 2024 12:06:59 +0530 Subject: [PATCH 07/19] update test method with easwar's code --- internal/transport/transport_test.go | 57 ++++++++-------------------- 1 file changed, 15 insertions(+), 42 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index ae1cbda411ff..2a8877f45624 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2747,10 +2747,10 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { } } -// Test that in the event of a graceful client transport shutdown -// , i.e., clientTransport.Close(), if the GOAWAY write is not -// finished within specified time due to network hang, client -// should still close without waiting for too long. +// Tests the scenario where a client transport is closed and writing of the +// GOAWAY frame as part of the close does not complete because of a network +// hang. The test verifies that the client transport is closed without waiting +// for too long. func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { // Override timer for writing GOAWAY to 0 so that the connection write // always times out. It is equivalent of real network hang when conn @@ -2761,49 +2761,22 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout }() - // Create a server. - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening: %v", err) - } - defer lis.Close() - - // Launch the server and allow HTTP/2 connections - go func() { - conn, err := lis.Accept() - if err != nil { - t.Errorf("Error while accepting: %v", err) - } - defer conn.Close() - framer := http2.NewFramer(conn, conn) - if err := framer.WriteSettings(); err != nil { - t.Errorf("Error while writing settings %v", err) - return - } - _, err = framer.ReadFrame() - if err != nil { - return - } - }() + server, ct, cancel := setUp(t, 0, normal) + defer cancel() + defer server.stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - ct, err := NewClientTransport(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {}) - if err != nil { - t.Fatalf("Error while creating client transport: %v", err) - } - _, err = ct.NewStream(ctx, &CallHdr{}) - if err != nil { - t.Fatalf("failed to open stream: %v", err) + if _, err := ct.NewStream(ctx, &CallHdr{}); err != nil { + t.Fatalf("Failed to open stream: %v", err) } - // ctClosed verifies that clientTransport is closed successfully - ctClosed := make(chan error, 1) + ctClosed := make(chan struct{}) go func() { - // ct.Close will try to send the GOAWAY to server but conn. - // Write will time out due to goAwayLoopyWriterTimeout being - // 0. It is equivalent of a network hang when GOAWAY doesn't - // finish within specified deadline. + // ct.Close will try to send a GOAWAY frame to the server but conn.Write + // will time out due to goAwayLoopyWriterTimeout being 0. It is + // equivalent of a network hang when GOAWAY doesn't finish within + // specified deadline. ct.Close(errors.New("manually closed by client")) close(ctClosed) }() @@ -2811,6 +2784,6 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { select { case <-ctClosed: case <-ctx.Done(): - t.Errorf("timeout waiting for client Close(): Context timed out") + t.Errorf("Timeout waiting for Close() to complete") } } From bc66ca9803ff4bf94291329b95c5f05b90a00db5 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 25 Jul 2024 19:23:28 +0000 Subject: [PATCH 08/19] don't use time.After() in non-test code to make vet happy --- internal/transport/http2_client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 0a33943929d8..50abf8741bd9 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1008,12 +1008,15 @@ func (t *http2Client) Close(err error) { t.kpDormancyCond.Signal() } t.mu.Unlock() + var st *status.Status // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It // also waits for loopyWriter to be closed with a timer to avoid the // long blocking in case the connection is half closed. t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err}) + timer := time.NewTimer(goAwayLoopyWriterTimeout) + defer timer.Stop() select { case <-t.writerDone: // Append info about previous goaway's if there were any, since this @@ -1026,7 +1029,7 @@ func (t *http2Client) Close(err error) { } else { st = status.New(codes.Unavailable, err.Error()) } - case <-time.After(goAwayLoopyWriterTimeout): + case <-timer.C: st = status.New(codes.Unavailable, err.Error()) t.logger.Warningf("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout) } From 8ffe575f79aad8728f180f0bf199fa816d816be6 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Mon, 5 Aug 2024 11:58:41 +0530 Subject: [PATCH 09/19] fix: replace Warning with Infof --- internal/transport/http2_client.go | 5 +++-- internal/transport/transport_test.go | 16 +--------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 50abf8741bd9..0ed7f7a1f17f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1013,7 +1013,8 @@ func (t *http2Client) Close(err error) { // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It // also waits for loopyWriter to be closed with a timer to avoid the - // long blocking in case the connection is half closed. + // long blocking in case the connection is blackholed, i.e. TCP is + // just stuck. t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err}) timer := time.NewTimer(goAwayLoopyWriterTimeout) defer timer.Stop() @@ -1031,7 +1032,7 @@ func (t *http2Client) Close(err error) { } case <-timer.C: st = status.New(codes.Unavailable, err.Error()) - t.logger.Warningf("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout) + t.logger.Infof("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout) } t.cancel() t.conn.Close() diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 2a8877f45624..b41917fc1e44 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2771,19 +2771,5 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { t.Fatalf("Failed to open stream: %v", err) } - ctClosed := make(chan struct{}) - go func() { - // ct.Close will try to send a GOAWAY frame to the server but conn.Write - // will time out due to goAwayLoopyWriterTimeout being 0. It is - // equivalent of a network hang when GOAWAY doesn't finish within - // specified deadline. - ct.Close(errors.New("manually closed by client")) - close(ctClosed) - }() - - select { - case <-ctClosed: - case <-ctx.Done(): - t.Errorf("Timeout waiting for Close() to complete") - } + ct.Close(errors.New("manually closed by client")) } From a68454221e9f264af2b5b2aba911911089bd673e Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Tue, 6 Aug 2024 12:55:46 +0530 Subject: [PATCH 10/19] fix: add hanging conn dialer --- internal/transport/transport_test.go | 75 ++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 4 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index b41917fc1e44..0c57ad45bf38 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -427,8 +427,12 @@ func setUpServerOnly(t *testing.T, port int, sc *ServerConfig, ht hType) *server return server } -func setUp(t *testing.T, port int, ht hType) (*server, *http2Client, func()) { - return setUpWithOptions(t, port, &ServerConfig{}, ht, ConnectOptions{}) +func setUp(t *testing.T, port int, ht hType, options ...ConnectOptions) (*server, *http2Client, func()) { + var copts = ConnectOptions{} + if len(options) > 0 { + copts = options[0] + } + return setUpWithOptions(t, port, &ServerConfig{}, ht, copts) } func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) { @@ -2747,6 +2751,65 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { } } +// serverGreetingDone verifies that client-server setup is complete +// for the test. +var serverGreetingDone chan struct{} + +// hangingConn is a net.Conn wrapper for testing, simulating hanging connections +// after a GOAWAY frame is sent, of which Write operations pause until explicitly signaled +// or a timeout occurs. +type hangingConn struct { + net.Conn +} + +func (hc *hangingConn) Read(b []byte) (n int, err error) { + n, err = hc.Conn.Read(b) + return n, err +} + +func (hc *hangingConn) Write(b []byte) (n int, err error) { + n, err = hc.Conn.Write(b) + if serverGreetingDone != nil { + // Wait for client-server conn to set up + <-serverGreetingDone + // Add a delay which is more than goAwayLoopyWriterTimeout + time.Sleep(2 * time.Second) + } + return n, err +} + +func (hc *hangingConn) Close() error { + return hc.Conn.Close() +} + +func (hc *hangingConn) LocalAddr() net.Addr { + return hc.Conn.LocalAddr() +} + +func (hc *hangingConn) RemoteAddr() net.Addr { + return hc.Conn.RemoteAddr() +} + +func (hc *hangingConn) SetDeadline(t time.Time) error { + return hc.Conn.SetDeadline(t) +} + +func (hc *hangingConn) SetReadDeadline(t time.Time) error { + return hc.Conn.SetReadDeadline(t) +} + +func (hc *hangingConn) SetWriteDeadline(t time.Time) error { + return hc.Conn.SetWriteDeadline(t) +} + +func hangingDialer(_ context.Context, addr string) (net.Conn, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + return &hangingConn{Conn: conn}, nil +} + // Tests the scenario where a client transport is closed and writing of the // GOAWAY frame as part of the close does not complete because of a network // hang. The test verifies that the client transport is closed without waiting @@ -2756,12 +2819,16 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { // always times out. It is equivalent of real network hang when conn // write for goaway doesn't finish in specified deadline origGoAwayLoopyTimeout := goAwayLoopyWriterTimeout - goAwayLoopyWriterTimeout = 0 + goAwayLoopyWriterTimeout = time.Second defer func() { goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout }() - server, ct, cancel := setUp(t, 0, normal) + server, ct, cancel := setUp(t, 0, normal, ConnectOptions{Dialer: hangingDialer}) + serverGreetingDone = make(chan struct{}) + // Acknowledge that client-server greeting is done + serverGreetingDone <- struct{}{} + defer close(serverGreetingDone) defer cancel() defer server.stop() From 59cc2300553092f20df18db908bafb08542adfd0 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Tue, 6 Aug 2024 18:16:23 +0530 Subject: [PATCH 11/19] fixed race condition in test --- internal/transport/transport_test.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 0c57ad45bf38..f3e380d2271f 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -32,6 +32,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -427,6 +428,10 @@ func setUpServerOnly(t *testing.T, port int, sc *ServerConfig, ht hType) *server return server } +// isGreetingDone verifies that client-server setup is complete +// for the test. +var isGreetingsDone = atomic.Bool{} + func setUp(t *testing.T, port int, ht hType, options ...ConnectOptions) (*server, *http2Client, func()) { var copts = ConnectOptions{} if len(options) > 0 { @@ -446,6 +451,7 @@ func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts cancel() // Do not cancel in success path. t.Fatalf("failed to create transport: %v", connErr) } + isGreetingsDone.Store(true) return server, ct.(*http2Client), cancel } @@ -2751,10 +2757,6 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { } } -// serverGreetingDone verifies that client-server setup is complete -// for the test. -var serverGreetingDone chan struct{} - // hangingConn is a net.Conn wrapper for testing, simulating hanging connections // after a GOAWAY frame is sent, of which Write operations pause until explicitly signaled // or a timeout occurs. @@ -2769,9 +2771,7 @@ func (hc *hangingConn) Read(b []byte) (n int, err error) { func (hc *hangingConn) Write(b []byte) (n int, err error) { n, err = hc.Conn.Write(b) - if serverGreetingDone != nil { - // Wait for client-server conn to set up - <-serverGreetingDone + if isGreetingsDone.Load() == true { // Add a delay which is more than goAwayLoopyWriterTimeout time.Sleep(2 * time.Second) } @@ -2824,11 +2824,9 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout }() + isGreetingsDone = atomic.Bool{} + server, ct, cancel := setUp(t, 0, normal, ConnectOptions{Dialer: hangingDialer}) - serverGreetingDone = make(chan struct{}) - // Acknowledge that client-server greeting is done - serverGreetingDone <- struct{}{} - defer close(serverGreetingDone) defer cancel() defer server.stop() From b9a16d41a1373f5b7904c75690122e9b424d509a Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Fri, 9 Aug 2024 22:13:12 +0530 Subject: [PATCH 12/19] Decouple error logic with timer logic waiting for loopy --- internal/transport/http2_client.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 0ed7f7a1f17f..a26698ba08a9 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1009,7 +1009,6 @@ func (t *http2Client) Close(err error) { } t.mu.Unlock() - var st *status.Status // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It // also waits for loopyWriter to be closed with a timer to avoid the @@ -1019,24 +1018,24 @@ func (t *http2Client) Close(err error) { timer := time.NewTimer(goAwayLoopyWriterTimeout) defer timer.Stop() select { - case <-t.writerDone: - // Append info about previous goaway's if there were any, since this - // may be important for understanding the root cause for this - // connection to be closed. - _, goAwayDebugMessage := t.GetGoAwayReason() - if len(goAwayDebugMessage) > 0 { - st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage) - err = st.Err() - } else { - st = status.New(codes.Unavailable, err.Error()) - } + case <-t.writerDone: // success case <-timer.C: - st = status.New(codes.Unavailable, err.Error()) t.logger.Infof("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout) } t.cancel() t.conn.Close() channelz.RemoveEntry(t.channelz.ID) + // Append info about previous goaways if there were any, since this may be important + // for understanding the root cause for this connection to be closed. + _, goAwayDebugMessage := t.GetGoAwayReason() + + var st *status.Status + if len(goAwayDebugMessage) > 0 { + st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage) + err = st.Err() + } else { + st = status.New(codes.Unavailable, err.Error()) + } // Notify all active streams. for _, s := range streams { From 3ec2ccbf888521af2ebcc410e342731c84b3db12 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Fri, 9 Aug 2024 22:29:36 +0530 Subject: [PATCH 13/19] Remove redundant overridden methods of hangingConn --- internal/transport/transport_test.go | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index f3e380d2271f..de008b833b34 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2778,30 +2778,6 @@ func (hc *hangingConn) Write(b []byte) (n int, err error) { return n, err } -func (hc *hangingConn) Close() error { - return hc.Conn.Close() -} - -func (hc *hangingConn) LocalAddr() net.Addr { - return hc.Conn.LocalAddr() -} - -func (hc *hangingConn) RemoteAddr() net.Addr { - return hc.Conn.RemoteAddr() -} - -func (hc *hangingConn) SetDeadline(t time.Time) error { - return hc.Conn.SetDeadline(t) -} - -func (hc *hangingConn) SetReadDeadline(t time.Time) error { - return hc.Conn.SetReadDeadline(t) -} - -func (hc *hangingConn) SetWriteDeadline(t time.Time) error { - return hc.Conn.SetWriteDeadline(t) -} - func hangingDialer(_ context.Context, addr string) (net.Conn, error) { conn, err := net.Dial("tcp", addr) if err != nil { From 4a6bcf501f49b3708d7111672bc838457632b30a Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Sat, 10 Aug 2024 10:42:55 +0530 Subject: [PATCH 14/19] fixed nits --- internal/transport/transport_test.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index de008b833b34..3342f817ab0a 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -430,7 +430,7 @@ func setUpServerOnly(t *testing.T, port int, sc *ServerConfig, ht hType) *server // isGreetingDone verifies that client-server setup is complete // for the test. -var isGreetingsDone = atomic.Bool{} +var isGreetingDone = atomic.Bool{} func setUp(t *testing.T, port int, ht hType, options ...ConnectOptions) (*server, *http2Client, func()) { var copts = ConnectOptions{} @@ -451,7 +451,7 @@ func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts cancel() // Do not cancel in success path. t.Fatalf("failed to create transport: %v", connErr) } - isGreetingsDone.Store(true) + isGreetingDone.Store(true) return server, ct.(*http2Client), cancel } @@ -2762,6 +2762,7 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { // or a timeout occurs. type hangingConn struct { net.Conn + hangConn chan struct{} } func (hc *hangingConn) Read(b []byte) (n int, err error) { @@ -2771,9 +2772,14 @@ func (hc *hangingConn) Read(b []byte) (n int, err error) { func (hc *hangingConn) Write(b []byte) (n int, err error) { n, err = hc.Conn.Write(b) - if isGreetingsDone.Load() == true { - // Add a delay which is more than goAwayLoopyWriterTimeout - time.Sleep(2 * time.Second) + if isGreetingDone.Load() == true { + // Hang the Write for more than goAwayLoopyWriterTimeout + timer := time.NewTimer(time.Millisecond * 5) + defer timer.Stop() + select { + case <-hc.hangConn: + case <-timer.C: + } } return n, err } @@ -2783,7 +2789,7 @@ func hangingDialer(_ context.Context, addr string) (net.Conn, error) { if err != nil { return nil, err } - return &hangingConn{Conn: conn}, nil + return &hangingConn{Conn: conn, hangConn: make(chan struct{})}, nil } // Tests the scenario where a client transport is closed and writing of the @@ -2795,12 +2801,12 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { // always times out. It is equivalent of real network hang when conn // write for goaway doesn't finish in specified deadline origGoAwayLoopyTimeout := goAwayLoopyWriterTimeout - goAwayLoopyWriterTimeout = time.Second + goAwayLoopyWriterTimeout = time.Millisecond defer func() { goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout }() - isGreetingsDone = atomic.Bool{} + isGreetingDone.Store(false) server, ct, cancel := setUp(t, 0, normal, ConnectOptions{Dialer: hangingDialer}) defer cancel() From 205fbdf9d596a875cf60f4dbc2377e1008085502 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Tue, 13 Aug 2024 15:37:40 +0530 Subject: [PATCH 15/19] Make isGreetingDone part of hangingConn struct --- internal/transport/transport_test.go | 60 ++++++++++++++-------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 3342f817ab0a..c22d96df8e12 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -428,16 +428,8 @@ func setUpServerOnly(t *testing.T, port int, sc *ServerConfig, ht hType) *server return server } -// isGreetingDone verifies that client-server setup is complete -// for the test. -var isGreetingDone = atomic.Bool{} - -func setUp(t *testing.T, port int, ht hType, options ...ConnectOptions) (*server, *http2Client, func()) { - var copts = ConnectOptions{} - if len(options) > 0 { - copts = options[0] - } - return setUpWithOptions(t, port, &ServerConfig{}, ht, copts) +func setUp(t *testing.T, port int, ht hType) (*server, *http2Client, func()) { + return setUpWithOptions(t, port, &ServerConfig{}, ht, ConnectOptions{}) } func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) { @@ -451,7 +443,6 @@ func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts cancel() // Do not cancel in success path. t.Fatalf("failed to create transport: %v", connErr) } - isGreetingDone.Store(true) return server, ct.(*http2Client), cancel } @@ -2758,21 +2749,17 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { } // hangingConn is a net.Conn wrapper for testing, simulating hanging connections -// after a GOAWAY frame is sent, of which Write operations pause until explicitly signaled -// or a timeout occurs. +// after a GOAWAY frame is sent, of which Write operations pause until explicitly +// signaled or a timeout occurs. type hangingConn struct { net.Conn - hangConn chan struct{} -} - -func (hc *hangingConn) Read(b []byte) (n int, err error) { - n, err = hc.Conn.Read(b) - return n, err + hangConn chan struct{} + isGreetingDone *atomic.Bool } func (hc *hangingConn) Write(b []byte) (n int, err error) { n, err = hc.Conn.Write(b) - if isGreetingDone.Load() == true { + if hc.isGreetingDone.Load() == true { // Hang the Write for more than goAwayLoopyWriterTimeout timer := time.NewTimer(time.Millisecond * 5) defer timer.Stop() @@ -2784,14 +2771,6 @@ func (hc *hangingConn) Write(b []byte) (n int, err error) { return n, err } -func hangingDialer(_ context.Context, addr string) (net.Conn, error) { - conn, err := net.Dial("tcp", addr) - if err != nil { - return nil, err - } - return &hangingConn{Conn: conn, hangConn: make(chan struct{})}, nil -} - // Tests the scenario where a client transport is closed and writing of the // GOAWAY frame as part of the close does not complete because of a network // hang. The test verifies that the client transport is closed without waiting @@ -2806,9 +2785,30 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout }() - isGreetingDone.Store(false) + // Create the server set up. + connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + server := setUpServerOnly(t, 0, &ServerConfig{}, normal) + addr := resolver.Address{Addr: "localhost:" + server.port} + isGreetingDone := &atomic.Bool{} + dialer := func(_ context.Context, addr string) (net.Conn, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + isGreetingDone.Store(false) + return &hangingConn{Conn: conn, hangConn: make(chan struct{}), isGreetingDone: isGreetingDone}, nil + } + copts := ConnectOptions{Dialer: dialer} + copts.ChannelzParent = channelzSubChannel(t) + + // Create client transport with custom dialer + ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {}) + if connErr != nil { + cancel() // Do not cancel in success path. + t.Fatalf("failed to create transport: %v", connErr) + } + isGreetingDone.Store(true) - server, ct, cancel := setUp(t, 0, normal, ConnectOptions{Dialer: hangingDialer}) defer cancel() defer server.stop() From a7a7bb573011adb4deff4eeee34cfae4ff70cf5d Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Wed, 14 Aug 2024 16:19:59 +0530 Subject: [PATCH 16/19] fixed nits --- internal/transport/transport_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index c22d96df8e12..7f24a094bbc4 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2753,13 +2753,13 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { // signaled or a timeout occurs. type hangingConn struct { net.Conn - hangConn chan struct{} - isGreetingDone *atomic.Bool + hangConn chan struct{} + startHanging *atomic.Bool } func (hc *hangingConn) Write(b []byte) (n int, err error) { n, err = hc.Conn.Write(b) - if hc.isGreetingDone.Load() == true { + if hc.startHanging.Load() == true { // Hang the Write for more than goAwayLoopyWriterTimeout timer := time.NewTimer(time.Millisecond * 5) defer timer.Stop() @@ -2795,8 +2795,7 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { if err != nil { return nil, err } - isGreetingDone.Store(false) - return &hangingConn{Conn: conn, hangConn: make(chan struct{}), isGreetingDone: isGreetingDone}, nil + return &hangingConn{Conn: conn, hangConn: make(chan struct{}), startHanging: isGreetingDone}, nil } copts := ConnectOptions{Dialer: dialer} copts.ChannelzParent = channelzSubChannel(t) From 9636c56603031c874edf28ff55df532607964633 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Thu, 15 Aug 2024 18:58:38 +0530 Subject: [PATCH 17/19] Set write deadline for net.Conn --- internal/transport/http2_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index a26698ba08a9..42a53e62d8f1 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -985,6 +985,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. // only once on a transport. Once it is called, the transport should not be // accessed anymore. func (t *http2Client) Close(err error) { + t.conn.SetWriteDeadline(time.Now().Add(time.Second * 10)) t.mu.Lock() // Make sure we only close once. if t.state == closing { From aebe562452f515cbb45f2eb0337df40f961056ce Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Fri, 16 Aug 2024 12:27:18 +0530 Subject: [PATCH 18/19] remove timer for hc.Write --- internal/transport/transport_test.go | 48 ++++++++-------------------- 1 file changed, 14 insertions(+), 34 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 7f24a094bbc4..6fcdd60d5594 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -32,7 +32,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "testing" "time" @@ -59,6 +58,8 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } +const goAwayFrameSize = 42 + var ( expectedRequest = []byte("ping") expectedResponse = []byte("pong") @@ -2753,24 +2754,25 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { // signaled or a timeout occurs. type hangingConn struct { net.Conn - hangConn chan struct{} - startHanging *atomic.Bool + hangConn chan struct{} } func (hc *hangingConn) Write(b []byte) (n int, err error) { n, err = hc.Conn.Write(b) - if hc.startHanging.Load() == true { - // Hang the Write for more than goAwayLoopyWriterTimeout - timer := time.NewTimer(time.Millisecond * 5) - defer timer.Stop() - select { - case <-hc.hangConn: - case <-timer.C: - } + if n == goAwayFrameSize { // hang the conn after the goAway is received + <-hc.hangConn } return n, err } +func hangingDialer(_ context.Context, addr string) (net.Conn, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + return &hangingConn{Conn: conn, hangConn: make(chan struct{})}, nil +} + // Tests the scenario where a client transport is closed and writing of the // GOAWAY frame as part of the close does not complete because of a network // hang. The test verifies that the client transport is closed without waiting @@ -2786,28 +2788,7 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { }() // Create the server set up. - connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - server := setUpServerOnly(t, 0, &ServerConfig{}, normal) - addr := resolver.Address{Addr: "localhost:" + server.port} - isGreetingDone := &atomic.Bool{} - dialer := func(_ context.Context, addr string) (net.Conn, error) { - conn, err := net.Dial("tcp", addr) - if err != nil { - return nil, err - } - return &hangingConn{Conn: conn, hangConn: make(chan struct{}), startHanging: isGreetingDone}, nil - } - copts := ConnectOptions{Dialer: dialer} - copts.ChannelzParent = channelzSubChannel(t) - - // Create client transport with custom dialer - ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {}) - if connErr != nil { - cancel() // Do not cancel in success path. - t.Fatalf("failed to create transport: %v", connErr) - } - isGreetingDone.Store(true) - + server, ct, cancel := setUpWithOptions(t, 0, &ServerConfig{}, normal, ConnectOptions{Dialer: hangingDialer}) defer cancel() defer server.stop() @@ -2816,6 +2797,5 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { if _, err := ct.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("Failed to open stream: %v", err) } - ct.Close(errors.New("manually closed by client")) } From fad99bae49149d10b77447829a0afedbfee73af2 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Fri, 16 Aug 2024 23:04:56 +0530 Subject: [PATCH 19/19] Revert the atomic boolean for hangingConn.Write() --- internal/transport/transport_test.go | 39 ++++++++++++++++++---------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 6fcdd60d5594..36739490ed30 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -32,6 +32,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -58,8 +59,6 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -const goAwayFrameSize = 42 - var ( expectedRequest = []byte("ping") expectedResponse = []byte("pong") @@ -2754,25 +2753,18 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { // signaled or a timeout occurs. type hangingConn struct { net.Conn - hangConn chan struct{} + hangConn chan struct{} + startHanging *atomic.Bool } func (hc *hangingConn) Write(b []byte) (n int, err error) { n, err = hc.Conn.Write(b) - if n == goAwayFrameSize { // hang the conn after the goAway is received + if hc.startHanging.Load() { <-hc.hangConn } return n, err } -func hangingDialer(_ context.Context, addr string) (net.Conn, error) { - conn, err := net.Dial("tcp", addr) - if err != nil { - return nil, err - } - return &hangingConn{Conn: conn, hangConn: make(chan struct{})}, nil -} - // Tests the scenario where a client transport is closed and writing of the // GOAWAY frame as part of the close does not complete because of a network // hang. The test verifies that the client transport is closed without waiting @@ -2788,14 +2780,35 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { }() // Create the server set up. - server, ct, cancel := setUpWithOptions(t, 0, &ServerConfig{}, normal, ConnectOptions{Dialer: hangingDialer}) + connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + server := setUpServerOnly(t, 0, &ServerConfig{}, normal) defer server.stop() + addr := resolver.Address{Addr: "localhost:" + server.port} + isGreetingDone := &atomic.Bool{} + hangConn := make(chan struct{}) + defer close(hangConn) + dialer := func(_ context.Context, addr string) (net.Conn, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + return &hangingConn{Conn: conn, hangConn: hangConn, startHanging: isGreetingDone}, nil + } + copts := ConnectOptions{Dialer: dialer} + copts.ChannelzParent = channelzSubChannel(t) + // Create client transport with custom dialer + ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {}) + if connErr != nil { + t.Fatalf("failed to create transport: %v", connErr) + } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := ct.NewStream(ctx, &CallHdr{}); err != nil { t.Fatalf("Failed to open stream: %v", err) } + + isGreetingDone.Store(true) ct.Close(errors.New("manually closed by client")) }