From 08555a9f9d4ef5968fc58adc35a27e342a4e4210 Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Mon, 6 Nov 2023 11:59:57 -0500 Subject: [PATCH 1/5] Debug falky TestStreamForServer test --- connect_ext_test.go | 22 ++++++++++++---------- duplex_http_call.go | 6 ++++++ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/connect_ext_test.go b/connect_ext_test.go index af541259..c53eb204 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -1510,7 +1510,7 @@ func TestBidiStreamServerSendsFirstMessage(t *testing.T) { func TestStreamForServer(t *testing.T) { t.Parallel() - newPingClient := func(pingServer pingv1connect.PingServiceHandler) pingv1connect.PingServiceClient { + newPingClient := func(t *testing.T, pingServer pingv1connect.PingServiceHandler) pingv1connect.PingServiceClient { mux := http.NewServeMux() mux.Handle(pingv1connect.NewPingServiceHandler(pingServer)) server := memhttptest.NewServer(t, mux) @@ -1522,7 +1522,7 @@ func TestStreamForServer(t *testing.T) { } t.Run("not-proto-message", func(t *testing.T) { t.Parallel() - client := newPingClient(&pluggablePingServer{ + client := newPingClient(t, &pluggablePingServer{ cumSum: func(ctx context.Context, stream *connect.BidiStream[pingv1.CumSumRequest, pingv1.CumSumResponse]) error { return stream.Conn().Send("foobar") }, @@ -1536,7 +1536,7 @@ func TestStreamForServer(t *testing.T) { }) t.Run("nil-message", func(t *testing.T) { t.Parallel() - client := newPingClient(&pluggablePingServer{ + client := newPingClient(t, &pluggablePingServer{ cumSum: func(ctx context.Context, stream *connect.BidiStream[pingv1.CumSumRequest, pingv1.CumSumResponse]) error { return stream.Send(nil) }, @@ -1550,7 +1550,7 @@ func TestStreamForServer(t *testing.T) { }) t.Run("get-spec", func(t *testing.T) { t.Parallel() - client := newPingClient(&pluggablePingServer{ + client := newPingClient(t, &pluggablePingServer{ cumSum: func(ctx context.Context, stream *connect.BidiStream[pingv1.CumSumRequest, pingv1.CumSumResponse]) error { assert.Equal(t, stream.Spec().StreamType, connect.StreamTypeBidi) assert.Equal(t, stream.Spec().Procedure, pingv1connect.PingServiceCumSumProcedure) @@ -1564,7 +1564,7 @@ func TestStreamForServer(t *testing.T) { }) t.Run("server-stream", func(t *testing.T) { t.Parallel() - client := newPingClient(&pluggablePingServer{ + client := newPingClient(t, &pluggablePingServer{ countUp: func(ctx context.Context, req *connect.Request[pingv1.CountUpRequest], stream *connect.ServerStream[pingv1.CountUpResponse]) error { assert.Equal(t, stream.Conn().Spec().StreamType, connect.StreamTypeServer) assert.Equal(t, stream.Conn().Spec().Procedure, pingv1connect.PingServiceCountUpProcedure) @@ -1580,7 +1580,7 @@ func TestStreamForServer(t *testing.T) { }) t.Run("server-stream-send", func(t *testing.T) { t.Parallel() - client := newPingClient(&pluggablePingServer{ + client := newPingClient(t, &pluggablePingServer{ countUp: func(ctx context.Context, req *connect.Request[pingv1.CountUpRequest], stream *connect.ServerStream[pingv1.CountUpResponse]) error { assert.Nil(t, stream.Send(&pingv1.CountUpResponse{Number: 1})) return nil @@ -1596,7 +1596,7 @@ func TestStreamForServer(t *testing.T) { }) t.Run("server-stream-send-nil", func(t *testing.T) { t.Parallel() - client := newPingClient(&pluggablePingServer{ + client := newPingClient(t, &pluggablePingServer{ countUp: func(ctx context.Context, req *connect.Request[pingv1.CountUpRequest], stream *connect.ServerStream[pingv1.CountUpResponse]) error { stream.ResponseHeader().Set("foo", "bar") stream.ResponseTrailer().Set("bas", "blah") @@ -1617,7 +1617,7 @@ func TestStreamForServer(t *testing.T) { }) t.Run("client-stream", func(t *testing.T) { t.Parallel() - client := newPingClient(&pluggablePingServer{ + client := newPingClient(t, &pluggablePingServer{ sum: func(ctx context.Context, stream *connect.ClientStream[pingv1.SumRequest]) (*connect.Response[pingv1.SumResponse], error) { assert.Equal(t, stream.Spec().StreamType, connect.StreamTypeClient) assert.Equal(t, stream.Spec().Procedure, pingv1connect.PingServiceSumProcedure) @@ -1638,8 +1638,9 @@ func TestStreamForServer(t *testing.T) { }) t.Run("client-stream-conn", func(t *testing.T) { t.Parallel() - client := newPingClient(&pluggablePingServer{ + client := newPingClient(t, &pluggablePingServer{ sum: func(ctx context.Context, stream *connect.ClientStream[pingv1.SumRequest]) (*connect.Response[pingv1.SumResponse], error) { + assert.True(t, stream.Receive()) assert.NotNil(t, stream.Conn().Send("not-proto")) return connect.NewResponse(&pingv1.SumResponse{}), nil }, @@ -1652,8 +1653,9 @@ func TestStreamForServer(t *testing.T) { }) t.Run("client-stream-send-msg", func(t *testing.T) { t.Parallel() - client := newPingClient(&pluggablePingServer{ + client := newPingClient(t, &pluggablePingServer{ sum: func(ctx context.Context, stream *connect.ClientStream[pingv1.SumRequest]) (*connect.Response[pingv1.SumResponse], error) { + assert.True(t, stream.Receive()) assert.Nil(t, stream.Conn().Send(&pingv1.SumResponse{Sum: 2})) return connect.NewResponse(&pingv1.SumResponse{}), nil }, diff --git a/duplex_http_call.go b/duplex_http_call.go index 7181dd65..bc2fefee 100644 --- a/duplex_http_call.go +++ b/duplex_http_call.go @@ -105,6 +105,12 @@ func (d *duplexHTTPCall) Write(data []byte) (int, error) { // reads from the other side. bytesWritten, err := d.requestBodyWriter.Write(data) if err != nil && errors.Is(err, io.ErrClosedPipe) { + // On sending headers we write a zero-length message to the server. + // If the server closes the connection without reading the body, + // we'll get an io.ErrClosedPipe. We can ignore this error. + if len(data) == 0 { + return 0, nil + } // Signal that the stream is closed with the more-typical io.EOF instead of // io.ErrClosedPipe. This makes it easier for protocol-specific wrappers to // match grpc-go's behavior. From c0c4d635fac5c3fd2403ea8d981eaf48db8cd4a6 Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Mon, 6 Nov 2023 12:07:31 -0500 Subject: [PATCH 2/5] Fix lint --- connect_ext_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/connect_ext_test.go b/connect_ext_test.go index c53eb204..b062d38e 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -1511,6 +1511,7 @@ func TestBidiStreamServerSendsFirstMessage(t *testing.T) { func TestStreamForServer(t *testing.T) { t.Parallel() newPingClient := func(t *testing.T, pingServer pingv1connect.PingServiceHandler) pingv1connect.PingServiceClient { + t.Helper() mux := http.NewServeMux() mux.Handle(pingv1connect.NewPingServiceHandler(pingServer)) server := memhttptest.NewServer(t, mux) From 3b261504079e43c19f7f7c8fa6004070510d8ec8 Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Thu, 9 Nov 2023 17:48:39 -0500 Subject: [PATCH 3/5] Avoid nil first write errors --- duplex_http_call.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/duplex_http_call.go b/duplex_http_call.go index bc2fefee..1f73adbc 100644 --- a/duplex_http_call.go +++ b/duplex_http_call.go @@ -96,21 +96,20 @@ func newDuplexHTTPCall( // Write to the request body. func (d *duplexHTTPCall) Write(data []byte) (int, error) { - d.ensureRequestMade() + isFirst := d.ensureRequestMade() // Before we send any data, check if the context has been canceled. if err := d.ctx.Err(); err != nil { return 0, wrapIfContextError(err) } + if isFirst && data == nil { + // On first write a nil Send is used to send request headers. Avoid + // writing a zero-length payload to avoid superfluous errors with close. + return 0, nil + } // It's safe to write to this side of the pipe while net/http concurrently // reads from the other side. bytesWritten, err := d.requestBodyWriter.Write(data) if err != nil && errors.Is(err, io.ErrClosedPipe) { - // On sending headers we write a zero-length message to the server. - // If the server closes the connection without reading the body, - // we'll get an io.ErrClosedPipe. We can ignore this error. - if len(data) == 0 { - return 0, nil - } // Signal that the stream is closed with the more-typical io.EOF instead of // io.ErrClosedPipe. This makes it easier for protocol-specific wrappers to // match grpc-go's behavior. @@ -235,10 +234,12 @@ func (d *duplexHTTPCall) BlockUntilResponseReady() error { // ensureRequestMade sends the request headers and starts the response stream. // It is not safe to call this concurrently. Write and CloseWrite call this but // ensure that they're not called concurrently. -func (d *duplexHTTPCall) ensureRequestMade() { +func (d *duplexHTTPCall) ensureRequestMade() (isFirst bool) { d.sendRequestOnce.Do(func() { + isFirst = true go d.makeRequest() }) + return isFirst } func (d *duplexHTTPCall) makeRequest() { From fb24122e633c2403929741c6328201c21266cee8 Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Thu, 9 Nov 2023 18:34:45 -0500 Subject: [PATCH 4/5] Use atomic over once closure --- duplex_http_call.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/duplex_http_call.go b/duplex_http_call.go index 1f73adbc..83741dde 100644 --- a/duplex_http_call.go +++ b/duplex_http_call.go @@ -21,7 +21,7 @@ import ( "io" "net/http" "net/url" - "sync" + "sync/atomic" ) // duplexHTTPCall is a full-duplex stream between the client and server. The @@ -43,8 +43,8 @@ type duplexHTTPCall struct { requestBodyWriter *io.PipeWriter // sendRequestOnce ensures we only send the request once. - sendRequestOnce sync.Once - request *http.Request + requestSent atomic.Bool + request *http.Request // responseReady is closed when the response is ready or when the request // fails. Any error on request initialisation will be set on the @@ -235,11 +235,11 @@ func (d *duplexHTTPCall) BlockUntilResponseReady() error { // It is not safe to call this concurrently. Write and CloseWrite call this but // ensure that they're not called concurrently. func (d *duplexHTTPCall) ensureRequestMade() (isFirst bool) { - d.sendRequestOnce.Do(func() { - isFirst = true + if d.requestSent.CompareAndSwap(false, true) { go d.makeRequest() - }) - return isFirst + return true + } + return false } func (d *duplexHTTPCall) makeRequest() { From d396735b3f69b0c5a3b090973a446b5cc0463030 Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Thu, 9 Nov 2023 18:38:33 -0500 Subject: [PATCH 5/5] Fix doc --- duplex_http_call.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/duplex_http_call.go b/duplex_http_call.go index 83741dde..35075bd5 100644 --- a/duplex_http_call.go +++ b/duplex_http_call.go @@ -42,7 +42,7 @@ type duplexHTTPCall struct { requestBodyReader *io.PipeReader requestBodyWriter *io.PipeWriter - // sendRequestOnce ensures we only send the request once. + // requestSent ensures we only send the request once. requestSent atomic.Bool request *http.Request