Skip to content

Commit

Permalink
Make client.Send(nil) send request headers
Browse files Browse the repository at this point in the history
Adding to the StreamingHandlerConn interface isn't backward-compatible,
but we need a way for clients to send request headers without a body.
This commit removes the previous commit's `SendHeaders` method, but makes
`Send(nil)` accomplish the same thing.
  • Loading branch information
akshayjshah committed Nov 7, 2022
1 parent 52877cf commit 7991753
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 23 deletions.
14 changes: 8 additions & 6 deletions client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (c *ClientStreamForClient[Req, Res]) Send(request *Req) error {
if c.err != nil {
return c.err
}
if request == nil {
return c.conn.Send(nil)
}
return c.conn.Send(request)
}

Expand Down Expand Up @@ -184,11 +187,6 @@ func (b *BidiStreamForClient[_, _]) Peer() Peer {
return b.conn.Peer()
}

// SendHeaders sends the request headers.
func (b *BidiStreamForClient[_, _]) SendHeaders() {
b.conn.SendHeaders()
}

// RequestHeader returns the request headers. Headers are sent with the first
// call to Send.
func (b *BidiStreamForClient[Req, Res]) RequestHeader() http.Header {
Expand All @@ -199,7 +197,8 @@ func (b *BidiStreamForClient[Req, Res]) RequestHeader() http.Header {
}

// Send a message to the server. The first call to Send also sends the request
// headers.
// headers. To send just the request headers, without a body, call Send with a
// nil pointer.
//
// If the server returns an error, Send returns an error that wraps [io.EOF].
// Clients should check for EOF using the standard library's [errors.Is] and
Expand All @@ -208,6 +207,9 @@ func (b *BidiStreamForClient[Req, Res]) Send(msg *Req) error {
if b.err != nil {
return b.err
}
if msg == nil {
return b.conn.Send(nil)
}
return b.conn.Send(msg)
}

Expand Down
5 changes: 2 additions & 3 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ type StreamingClientConn interface {
Spec() Spec
Peer() Peer

// Send, SendHeaders, RequestHeader, and CloseRequest may race with each other,
// but must be safe to call concurrently with all other methods.
// Send, RequestHeader, and CloseRequest may race with each other, but must
// be safe to call concurrently with all other methods.
Send(any) error
SendHeaders()
RequestHeader() http.Header
CloseRequest() error

Expand Down
4 changes: 2 additions & 2 deletions connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ func TestClientWithSendMaxBytes(t *testing.T) {
})
}

func TestBidiStreamForClientSendHeaders(t *testing.T) {
func TestBidiStreamServerSendsFirstMessage(t *testing.T) {
t.Parallel()
run := func(t *testing.T, opts ...connect.ClientOption) {
t.Helper()
Expand Down Expand Up @@ -1372,7 +1372,7 @@ func TestBidiStreamForClientSendHeaders(t *testing.T) {
assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
})
stream.SendHeaders()
assert.Nil(t, stream.Send(nil))
select {
case <-time.After(time.Second):
t.Error("timed out to get request headers")
Expand Down
9 changes: 9 additions & 0 deletions envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ type envelopeWriter struct {
}

func (w *envelopeWriter) Marshal(message any) *Error {
if message == nil {
if _, err := w.writer.Write(nil); err != nil {
if connectErr, ok := asError(err); ok {
return connectErr
}
return NewError(CodeUnknown, err)
}
return nil
}
raw, err := w.codec.Marshal(message)
if err != nil {
return errorf(CodeInternal, "marshal message: %w", err)
Expand Down
6 changes: 6 additions & 0 deletions handler_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func (s *ServerStream[Res]) ResponseTrailer() http.Header {
// Send a message to the client. The first call to Send also sends the response
// headers.
func (s *ServerStream[Res]) Send(msg *Res) error {
if msg == nil {
return s.conn.Send(nil)
}
return s.conn.Send(msg)
}

Expand Down Expand Up @@ -161,6 +164,9 @@ func (b *BidiStream[Req, Res]) ResponseTrailer() http.Header {
// Send a message to the client. The first call to Send also sends the response
// headers.
func (b *BidiStream[Req, Res]) Send(msg *Res) error {
if msg == nil {
return b.conn.Send(nil)
}
return b.conn.Send(msg)
}

Expand Down
11 changes: 3 additions & 8 deletions protocol_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,6 @@ func (cc *connectUnaryClientConn) Send(msg any) error {
return nil // must be a literal nil: nil *Error is a non-nil error
}

func (cc *connectUnaryClientConn) SendHeaders() {
cc.duplexCall.ensureRequestMade()
}

func (cc *connectUnaryClientConn) RequestHeader() http.Header {
return cc.duplexCall.Header()
}
Expand Down Expand Up @@ -460,10 +456,6 @@ func (cc *connectStreamingClientConn) Send(msg any) error {
return nil // must be a literal nil: nil *Error is a non-nil error
}

func (cc *connectStreamingClientConn) SendHeaders() {
cc.duplexCall.ensureRequestMade()
}

func (cc *connectStreamingClientConn) RequestHeader() http.Header {
return cc.duplexCall.Header()
}
Expand Down Expand Up @@ -753,6 +745,9 @@ type connectUnaryMarshaler struct {
}

func (m *connectUnaryMarshaler) Marshal(message any) *Error {
if message == nil {
return m.write(nil)
}
data, err := m.codec.Marshal(message)
if err != nil {
return errorf(CodeInternal, "marshal message: %w", err)
Expand Down
4 changes: 0 additions & 4 deletions protocol_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,6 @@ func (cc *grpcClientConn) Send(msg any) error {
return nil // must be a literal nil: nil *Error is a non-nil error
}

func (cc *grpcClientConn) SendHeaders() {
cc.duplexCall.ensureRequestMade()
}

func (cc *grpcClientConn) RequestHeader() http.Header {
return cc.duplexCall.Header()
}
Expand Down

0 comments on commit 7991753

Please sign in to comment.