Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use "unimplemented" code for cardinality violations #712

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 60 additions & 9 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,34 +354,85 @@ type handlerConnCloser interface {
Close(error) error
}

// receiveConn represents the shared methods of both StreamingClientConn and StreamingHandlerConn
// that the below helper functions use for implementing the rules around a "unary" stream, that
// is expected to have exactly one message (or zero messages followed by a non-EOF error).
type receiveConn interface {
Spec() Spec
Receive(any) error
}

// hasHTTPMethod is implemented by streaming connections that support HTTP methods other than
// POST.
type hasHTTPMethod interface {
getHTTPMethod() string
}

// receiveUnaryResponse unmarshals a message from a StreamingClientConn, then
// envelopes the message and attaches headers and trailers. It attempts to
// consume the response stream and isn't appropriate when receiving multiple
// messages.
func receiveUnaryResponse[T any](conn StreamingClientConn, initializer maybeInitializer) (*Response[T], error) {
msg, err := receiveUnaryMessage[T](conn, initializer)
if err != nil {
return nil, err
}
return &Response[T]{
Msg: msg,
header: conn.ResponseHeader(),
trailer: conn.ResponseTrailer(),
}, nil
}

// receiveUnaryRequest unmarshals a message from a StreamingClientConn, then
// envelopes the message and attaches headers and other request properties. It
// attempts to consume the request stream and isn't appropriate when receiving
// multiple messages.
func receiveUnaryRequest[T any](conn StreamingHandlerConn, initializer maybeInitializer) (*Request[T], error) {
msg, err := receiveUnaryMessage[T](conn, initializer)
if err != nil {
return nil, err
}
method := http.MethodPost
if hasRequestMethod, ok := conn.(hasHTTPMethod); ok {
method = hasRequestMethod.getHTTPMethod()
}
return &Request[T]{
Msg: msg,
spec: conn.Spec(),
peer: conn.Peer(),
header: conn.RequestHeader(),
method: method,
}, nil
}

func receiveUnaryMessage[T any](conn receiveConn, initializer maybeInitializer) (*T, error) {
var msg T
if err := initializer.maybe(conn.Spec(), &msg); err != nil {
return nil, err
}
// Possibly counter-intuitive, but the gRPC specs about error codes state that both clients
// and servers should return "unimplemented" when they encounter a cardinality violation: where
// the number of messages in the stream is wrong. Search for "cardinality violation" in the
// following docs:
// https://grpc.github.io/grpc/core/md_doc_statuscodes.html
if err := conn.Receive(&msg); err != nil {
if errors.Is(err, io.EOF) {
err = NewError(CodeUnimplemented, errors.New("unary request has zero messages"))
}
return nil, err
}
// In a well-formed stream, the response message may be followed by a block
// of in-stream trailers or HTTP trailers. To ensure that we receive the
// trailers, try to read another message from the stream.
// In a well-formed stream, the one message must be the only content in the body.
// To verify that it is well-formed, try to read another message from the stream.
// TODO: optimise unary calls to avoid this extra receive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO should be removed now, right? We always want to check stream cardinality.

Copy link
Member Author

@jhump jhump Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I interpreted the comment as wanting this second call to be more efficient. Currently, it requires allocating the second message, trying to fully read the subsequent message on the stream, and unmarshalling the data into the message. An optimized flow could do something cheaper and allocation-free to verify whether it has reached the end of stream or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revised the comment. Seem okay?

var msg2 T
if err := initializer.maybe(conn.Spec(), &msg2); err != nil {
return nil, err
}
if err := conn.Receive(&msg2); err == nil {
return nil, NewError(CodeUnknown, errors.New("unary stream has multiple messages"))
return nil, NewError(CodeUnimplemented, errors.New("unary response has multiple messages"))
} else if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}
return &Response[T]{
Msg: &msg,
header: conn.ResponseHeader(),
trailer: conn.ResponseTrailer(),
}, nil
return &msg, nil
}
3 changes: 2 additions & 1 deletion connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,7 @@ func TestStreamForServer(t *testing.T) {
client := newPingClient(t, &pluggablePingServer{
sum: func(ctx context.Context, stream *connect.ClientStream[pingv1.SumRequest]) (*connect.Response[pingv1.SumResponse], error) {
assert.True(t, stream.Receive())
// We end up sending two response messages, but only one is expected.
assert.Nil(t, stream.Conn().Send(&pingv1.SumResponse{Sum: 2}))
return connect.NewResponse(&pingv1.SumResponse{}), nil
},
Expand All @@ -1697,7 +1698,7 @@ func TestStreamForServer(t *testing.T) {
assert.Nil(t, stream.Send(&pingv1.SumRequest{Number: 1}))
res, err := stream.CloseAndReceive()
assert.NotNil(t, err)
assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown)
assert.Equal(t, connect.CodeOf(err), connect.CodeUnimplemented)
assert.Nil(t, res)
})
}
Expand Down
37 changes: 5 additions & 32 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,10 @@ func NewUnaryHandler[Req, Res any](
}
// Given a stream, how should we call the unary function?
implementation := func(ctx context.Context, conn StreamingHandlerConn) error {
var msg Req
if err := config.Initializer.maybe(conn.Spec(), &msg); err != nil {
return err
}
if err := conn.Receive(&msg); err != nil {
request, err := receiveUnaryRequest[Req](conn, config.Initializer)
if err != nil {
return err
}
method := http.MethodPost
if hasRequestMethod, ok := conn.(interface{ getHTTPMethod() string }); ok {
method = hasRequestMethod.getHTTPMethod()
}
request := &Request[Req]{
Msg: &msg,
spec: conn.Spec(),
peer: conn.Peer(),
header: conn.RequestHeader(),
method: method,
}
response, err := untyped(ctx, request)
if err != nil {
return err
Expand Down Expand Up @@ -140,24 +126,11 @@ func NewServerStreamHandler[Req, Res any](
return newStreamHandler(
config,
func(ctx context.Context, conn StreamingHandlerConn) error {
var msg Req
if err := config.Initializer.maybe(conn.Spec(), &msg); err != nil {
return err
}
if err := conn.Receive(&msg); err != nil {
req, err := receiveUnaryRequest[Req](conn, config.Initializer)
if err != nil {
return err
}
return implementation(
ctx,
&Request[Req]{
Msg: &msg,
spec: conn.Spec(),
peer: conn.Peer(),
header: conn.RequestHeader(),
method: http.MethodPost,
},
&ServerStream[Res]{conn: conn},
)
return implementation(ctx, req, &ServerStream[Res]{conn: conn})
},
)
}
Expand Down
12 changes: 12 additions & 0 deletions internal/conformance/known-failing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ HTTP to Connect Code Mapping/**/payload-too-large
HTTP to Connect Code Mapping/**/precondition-failed
HTTP to Connect Code Mapping/**/request-header-fields-too-large
HTTP to Connect Code Mapping/**/request-timeout

# The current v1.0.0-rc3 of conformance suite has expectations for these
# conditions that were based on the behavior of grpc-go (which returns an
# "unknown" error), with the incorrect idea that was authoritative (and,
# honestly, that code makes sense). However, the actual correct behavior,
# per the specification for gRPC error codes, is for these cardinality
# violations to instead return "unimplemented":
# https://grpc.github.io/grpc/core/md_doc_statuscodes.html
# This library returns the correct code, which (for now) is interpreted
# as a failure by the conformance suite.
**/unary-ok-but-no-response
**/unary-multiple-responses
Loading