diff --git a/README.md b/README.md index 16b45556..c551084a 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,10 @@ compression, and content type negotiation. It also generates an idiomatic, type-safe client. Handlers and clients support three protocols: gRPC, gRPC-Web, and Connect's own protocol. -The [Connect protocol][protocol] is a simple, POST-only protocol that works -over HTTP/1.1 or HTTP/2. It takes the best portions of gRPC and gRPC-Web, -including streaming, and packages them into a protocol that works equally well -in browsers, monoliths, and microservices. Calling a Connect API is as easy as +The [Connect protocol][protocol] is a simple protocol that works over HTTP/1.1 +or HTTP/2. It takes the best portions of gRPC and gRPC-Web, including +streaming, and packages them into a protocol that works equally well in +browsers, monoliths, and microservices. Calling a Connect API is as easy as using `curl`. Try it with our live demo: ``` diff --git a/client.go b/client.go index 142d55e4..08a1c066 100644 --- a/client.go +++ b/client.go @@ -62,6 +62,9 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien BufferPool: config.BufferPool, ReadMaxBytes: config.ReadMaxBytes, SendMaxBytes: config.SendMaxBytes, + EnableGet: config.EnableGet, + GetURLMaxBytes: config.GetURLMaxBytes, + GetUseFallback: config.GetUseFallback, }, ) if protocolErr != nil { @@ -188,6 +191,10 @@ type clientConfig struct { BufferPool *bufferPool ReadMaxBytes int SendMaxBytes int + EnableGet bool + GetURLMaxBytes int + GetUseFallback bool + IdempotencyLevel IdempotencyLevel } func newClientConfig(rawURL string, options []ClientOption) (*clientConfig, *Error) { @@ -235,9 +242,10 @@ func (c *clientConfig) protobuf() Codec { func (c *clientConfig) newSpec(t StreamType) Spec { return Spec{ - StreamType: t, - Procedure: c.Procedure, - IsClient: true, + StreamType: t, + Procedure: c.Procedure, + IsClient: true, + IdempotencyLevel: c.IdempotencyLevel, } } diff --git a/client_ext_test.go b/client_ext_test.go index c509e698..e448bd59 100644 --- a/client_ext_test.go +++ b/client_ext_test.go @@ -19,6 +19,7 @@ import ( "errors" "net/http" "net/http/httptest" + "strings" "testing" "github.com/bufbuild/connect-go" @@ -89,8 +90,12 @@ func TestClientPeer(t *testing.T) { ) ctx := context.Background() // unary - _, err := client.Ping(ctx, connect.NewRequest(&pingv1.PingRequest{})) + _, err := client.Ping(ctx, connect.NewRequest[pingv1.PingRequest](nil)) assert.Nil(t, err) + text := strings.Repeat(".", 256) + r, err := client.Ping(ctx, connect.NewRequest(&pingv1.PingRequest{Text: text})) + assert.Nil(t, err) + assert.Equal(t, r.Msg.Text, text) // client streaming clientStream := client.Sum(ctx) t.Cleanup(func() { @@ -123,6 +128,13 @@ func TestClientPeer(t *testing.T) { t.Parallel() run(t) }) + t.Run("connect+get", func(t *testing.T) { + t.Parallel() + run(t, + connect.WithHTTPGet(), + connect.WithSendGzip(), + ) + }) t.Run("grpc", func(t *testing.T) { t.Parallel() run(t, connect.WithGRPC()) diff --git a/client_get_fallback_test.go b/client_get_fallback_test.go new file mode 100644 index 00000000..caf246b1 --- /dev/null +++ b/client_get_fallback_test.go @@ -0,0 +1,62 @@ +// Copyright 2021-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connect + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/bufbuild/connect-go/internal/assert" + pingv1 "github.com/bufbuild/connect-go/internal/gen/connect/ping/v1" +) + +func TestClientUnaryGetFallback(t *testing.T) { + t.Parallel() + mux := http.NewServeMux() + mux.Handle("/connect.ping.v1.PingService/Ping", NewUnaryHandler( + "/connect.ping.v1.PingService/Ping", + func(ctx context.Context, r *Request[pingv1.PingRequest]) (*Response[pingv1.PingResponse], error) { + return NewResponse(&pingv1.PingResponse{ + Number: r.Msg.Number, + Text: r.Msg.Text, + }), nil + }, + WithIdempotency(IdempotencyNoSideEffects), + )) + server := httptest.NewUnstartedServer(mux) + server.EnableHTTP2 = true + server.StartTLS() + t.Cleanup(server.Close) + + client := NewClient[pingv1.PingRequest, pingv1.PingResponse]( + server.Client(), + server.URL+"/connect.ping.v1.PingService/Ping", + WithHTTPGet(), + withHTTPGetMaxURLSize(1, true), + WithSendGzip(), + ) + ctx := context.Background() + + _, err := client.CallUnary(ctx, NewRequest[pingv1.PingRequest](nil)) + assert.Nil(t, err) + + text := strings.Repeat(".", 256) + r, err := client.CallUnary(ctx, NewRequest(&pingv1.PingRequest{Text: text})) + assert.Nil(t, err) + assert.Equal(t, r.Msg.Text, text) +} diff --git a/cmd/protoc-gen-connect-go/main.go b/cmd/protoc-gen-connect-go/main.go index 3f5198a4..33e10e6a 100644 --- a/cmd/protoc-gen-connect-go/main.go +++ b/cmd/protoc-gen-connect-go/main.go @@ -102,6 +102,17 @@ func main() { ) } +func needsWithIdempotency(file *protogen.File) bool { + for _, service := range file.Services { + for _, method := range service.Methods { + if methodIdempotency(method) != connect.IdempotencyUnknown { + return true + } + } + } + return false +} + func generate(plugin *protogen.Plugin, file *protogen.File) { if len(file.Services) == 0 { return @@ -163,7 +174,11 @@ func generatePreamble(g *protogen.GeneratedFile, file *protogen.File) { "is not defined, this code was generated with a version of connect newer than the one ", "compiled into your binary. You can fix the problem by either regenerating this code ", "with an older version of connect or updating the connect version compiled into your binary.") - g.P("const _ = ", connectPackage.Ident("IsAtLeastVersion0_1_0")) + if needsWithIdempotency(file) { + g.P("const _ = ", connectPackage.Ident("IsAtLeastVersion1_6_0")) + } else { + g.P("const _ = ", connectPackage.Ident("IsAtLeastVersion0_1_0")) + } g.P() } @@ -262,7 +277,17 @@ func generateClientImplementation(g *protogen.GeneratedFile, service *protogen.S ) g.P("httpClient,") g.P(`baseURL + `, procedureConstName(method), `,`) - g.P("opts...,") + idempotency := methodIdempotency(method) + switch idempotency { + case connect.IdempotencyNoSideEffects: + g.P(connectPackage.Ident("WithIdempotency"), "(", connectPackage.Ident("IdempotencyNoSideEffects"), "),") + g.P(connectPackage.Ident("WithClientOptions"), "(opts...),") + case connect.IdempotencyIdempotent: + g.P(connectPackage.Ident("WithIdempotency"), "(", connectPackage.Ident("IdempotencyIdempotent"), "),") + g.P(connectPackage.Ident("WithClientOptions"), "(opts...),") + case connect.IdempotencyUnknown: + g.P("opts...,") + } g.P("),") } g.P("}") @@ -376,6 +401,7 @@ func generateServerConstructor(g *protogen.GeneratedFile, service *protogen.Serv for _, method := range service.Methods { isStreamingServer := method.Desc.IsStreamingServer() isStreamingClient := method.Desc.IsStreamingClient() + idempotency := methodIdempotency(method) switch { case isStreamingClient && !isStreamingServer: g.P(`mux.Handle(`, procedureConstName(method), `, `, connectPackage.Ident("NewClientStreamHandler"), "(") @@ -388,7 +414,16 @@ func generateServerConstructor(g *protogen.GeneratedFile, service *protogen.Serv } g.P(procedureConstName(method), `,`) g.P("svc.", method.GoName, ",") - g.P("opts...,") + switch idempotency { + case connect.IdempotencyNoSideEffects: + g.P(connectPackage.Ident("WithIdempotency"), "(", connectPackage.Ident("IdempotencyNoSideEffects"), "),") + g.P(connectPackage.Ident("WithHandlerOptions"), "(opts...),") + case connect.IdempotencyIdempotent: + g.P(connectPackage.Ident("WithIdempotency"), "(", connectPackage.Ident("IdempotencyIdempotent"), "),") + g.P(connectPackage.Ident("WithHandlerOptions"), "(opts...),") + case connect.IdempotencyUnknown: + g.P("opts...,") + } g.P("))") } g.P(`return "/`, reflectionName(service), `/", mux`) @@ -477,6 +512,22 @@ func isDeprecatedMethod(method *protogen.Method) bool { return ok && methodOptions.GetDeprecated() } +func methodIdempotency(method *protogen.Method) connect.IdempotencyLevel { + methodOptions, ok := method.Desc.Options().(*descriptorpb.MethodOptions) + if !ok { + return connect.IdempotencyUnknown + } + switch methodOptions.GetIdempotencyLevel() { + case descriptorpb.MethodOptions_NO_SIDE_EFFECTS: + return connect.IdempotencyNoSideEffects + case descriptorpb.MethodOptions_IDEMPOTENT: + return connect.IdempotencyIdempotent + case descriptorpb.MethodOptions_IDEMPOTENCY_UNKNOWN: + return connect.IdempotencyUnknown + } + return connect.IdempotencyUnknown +} + // Raggedy comments in the generated code are driving me insane. This // word-wrapping function is ruinously inefficient, but it gets the job done. func wrapComments(g *protogen.GeneratedFile, elems ...any) { diff --git a/codec.go b/codec.go index 1fb7cd11..bc0fe989 100644 --- a/codec.go +++ b/codec.go @@ -15,6 +15,8 @@ package connect import ( + "bytes" + "encoding/json" "errors" "fmt" @@ -51,6 +53,32 @@ type Codec interface { Unmarshal([]byte, any) error } +// stableCodec is an extension to Codec for serializing with stable output. +type stableCodec interface { + Codec + + // MarshalStable marshals the given message with stable field ordering. + // + // MarshalStable should return the same output for a given input. Although + // it is not guaranteed to be canonicalized, the marshalling routine for + // MarshalStable will opt for the most normalized output available for a + // given serialization. + // + // For practical reasons, it is possible for MarshalStable to return two + // different results for two inputs considered to be "equal" in their own + // domain, and it may change in the future with codec updates, but for + // any given concrete value and any given version, it should return the + // same output. + MarshalStable(any) ([]byte, error) + + // IsBinary returns true if the marshalled data is binary for this codec. + // + // If this function returns false, the data returned from Marshal and + // MarshalStable are considered valid text and may be used in contexts + // where text is expected. + IsBinary() bool +} + type protoBinaryCodec struct{} var _ Codec = (*protoBinaryCodec)(nil) @@ -73,6 +101,24 @@ func (c *protoBinaryCodec) Unmarshal(data []byte, message any) error { return proto.Unmarshal(data, protoMessage) } +func (c *protoBinaryCodec) MarshalStable(message any) ([]byte, error) { + protoMessage, ok := message.(proto.Message) + if !ok { + return nil, errNotProto(message) + } + // protobuf does not offer a canonical output today, so this format is not + // guaranteed to match deterministic output from other protobuf libraries. + // In addition, unknown fields may cause inconsistent output for otherwise + // equal messages. + // https://github.com/golang/protobuf/issues/1121 + options := proto.MarshalOptions{Deterministic: true} + return options.Marshal(protoMessage) +} + +func (c *protoBinaryCodec) IsBinary() bool { + return true +} + type protoJSONCodec struct { name string } @@ -102,6 +148,27 @@ func (c *protoJSONCodec) Unmarshal(binary []byte, message any) error { return options.Unmarshal(binary, protoMessage) } +func (c *protoJSONCodec) MarshalStable(message any) ([]byte, error) { + // protojson does not offer a "deterministic" field ordering, but fields + // are still ordered consistently by their index. However, protojson can + // output inconsistent whitespace for some reason, therefore it is + // suggested to use a formatter to ensure consistent formatting. + // https://github.com/golang/protobuf/issues/1373 + messageJSON, err := c.Marshal(message) + if err != nil { + return nil, err + } + compactedJSON := bytes.NewBuffer(messageJSON[:0]) + if err = json.Compact(compactedJSON, messageJSON); err != nil { + return nil, err + } + return compactedJSON.Bytes(), nil +} + +func (c *protoJSONCodec) IsBinary() bool { + return false +} + // readOnlyCodecs is a read-only interface to a map of named codecs. type readOnlyCodecs interface { // Get gets the Codec with the given name. diff --git a/codec_test.go b/codec_test.go index 45dc51af..b27e2562 100644 --- a/codec_test.go +++ b/codec_test.go @@ -15,13 +15,88 @@ package connect import ( + "bytes" "strings" "testing" + "testing/quick" "github.com/bufbuild/connect-go/internal/assert" + pingv1 "github.com/bufbuild/connect-go/internal/gen/connect/ping/v1" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/structpb" ) +func convertMapToInterface(stringMap map[string]string) map[string]interface{} { + interfaceMap := make(map[string]interface{}) + for key, value := range stringMap { + interfaceMap[key] = value + } + return interfaceMap +} + +func TestCodecRoundTrips(t *testing.T) { + t.Parallel() + makeRoundtrip := func(codec Codec) func(string, int64) bool { + return func(text string, number int64) bool { + got := pingv1.PingRequest{} + want := pingv1.PingRequest{Text: text, Number: number} + data, err := codec.Marshal(&want) + if err != nil { + t.Fatal(err) + } + err = codec.Unmarshal(data, &got) + if err != nil { + t.Fatal(err) + } + return proto.Equal(&got, &want) + } + } + if err := quick.Check(makeRoundtrip(&protoBinaryCodec{}), nil /* config */); err != nil { + t.Error(err) + } + if err := quick.Check(makeRoundtrip(&protoJSONCodec{}), nil /* config */); err != nil { + t.Error(err) + } +} + +func TestStableCodec(t *testing.T) { + t.Parallel() + makeRoundtrip := func(codec stableCodec) func(map[string]string) bool { + return func(input map[string]string) bool { + initialProto, err := structpb.NewStruct(convertMapToInterface(input)) + if err != nil { + t.Fatal(err) + } + want, err := codec.MarshalStable(initialProto) + if err != nil { + t.Fatal(err) + } + for i := 0; i < 10; i++ { + roundtripProto := &structpb.Struct{} + err = codec.Unmarshal(want, roundtripProto) + if err != nil { + t.Fatal(err) + } + got, err := codec.MarshalStable(roundtripProto) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, want) { + return false + } + } + return true + } + } + if err := quick.Check(makeRoundtrip(&protoBinaryCodec{}), nil /* config */); err != nil { + t.Error(err) + } + if err := quick.Check(makeRoundtrip(&protoJSONCodec{}), nil /* config */); err != nil { + t.Error(err) + } +} + func TestJSONCodec(t *testing.T) { t.Parallel() diff --git a/connect.go b/connect.go index 75e87417..a0749e1a 100644 --- a/connect.go +++ b/connect.go @@ -39,6 +39,7 @@ const Version = "1.7.0-dev" const ( IsAtLeastVersion0_0_1 = true IsAtLeastVersion0_1_0 = true + IsAtLeastVersion1_6_0 = true ) // StreamType describes whether the client, server, neither, or both is @@ -275,9 +276,10 @@ type HTTPClient interface { // If you're using Protobuf, protoc-gen-connect-go generates a constant for the // fully-qualified Procedure corresponding to each RPC in your schema. type Spec struct { - StreamType StreamType - Procedure string // for example, "/acme.foo.v1.FooService/Bar" - IsClient bool // otherwise we're in a handler + StreamType StreamType + Procedure string // for example, "/acme.foo.v1.FooService/Bar" + IsClient bool // otherwise we're in a handler + IdempotencyLevel IdempotencyLevel } // Peer describes the other party to an RPC. @@ -289,9 +291,13 @@ type Spec struct { // On both the client and the server, Protocol is the RPC protocol in use. // Currently, it's either [ProtocolConnect], [ProtocolGRPC], or // [ProtocolGRPCWeb], but additional protocols may be added in the future. +// +// Query contains the query parameters for the request. For the server, this +// will reflect the actual query parameters sent. For the client, it is unset. type Peer struct { Addr string Protocol string + Query url.Values // server-only } func newPeerFromURL(url *url.URL, protocol string) Peer { diff --git a/duplex_http_call.go b/duplex_http_call.go index 177089cd..77a09a4d 100644 --- a/duplex_http_call.go +++ b/duplex_http_call.go @@ -142,6 +142,16 @@ func (d *duplexHTTPCall) Trailer() http.Header { return d.request.Trailer } +// URL returns the URL for the request. +func (d *duplexHTTPCall) URL() *url.URL { + return d.request.URL +} + +// SetMethod changes the method of the request before it is sent. +func (d *duplexHTTPCall) SetMethod(method string) { + d.request.Method = method +} + // Read from the response body. Returns the first error passed to SetError. func (d *duplexHTTPCall) Read(data []byte) (int, error) { // First, we wait until we've gotten the response headers and established the diff --git a/handler.go b/handler.go index baa37465..89ceea13 100644 --- a/handler.go +++ b/handler.go @@ -30,6 +30,7 @@ type Handler struct { spec Spec implementation StreamingHandlerFunc protocolHandlers []protocolHandler + allowMethod string // Allow header acceptPost string // Accept-Post header } @@ -86,6 +87,7 @@ func NewUnaryHandler[Req, Res any]( spec: config.newSpec(StreamTypeUnary), implementation: implementation, protocolHandlers: protocolHandlers, + allowMethod: sortedAllowMethodValue(protocolHandlers), acceptPost: sortedAcceptPostValue(protocolHandlers), } } @@ -182,18 +184,25 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re return } - // The gRPC-HTTP2, gRPC-Web, and Connect protocols are all POST-only. - if request.Method != http.MethodPost { - responseWriter.Header().Set("Allow", http.MethodPost) + var protocolHandlers []protocolHandler + for _, handler := range h.protocolHandlers { + if _, ok := handler.Methods()[request.Method]; ok { + protocolHandlers = append(protocolHandlers, handler) + } + } + + if len(protocolHandlers) == 0 { + responseWriter.Header().Set("Allow", h.allowMethod) responseWriter.WriteHeader(http.StatusMethodNotAllowed) return } - // Find our implementation of the RPC protocol in use. contentType := canonicalizeContentType(getHeaderCanonical(request.Header, headerContentType)) + + // Find our implementation of the RPC protocol in use. var protocolHandler protocolHandler - for _, handler := range h.protocolHandlers { - if _, ok := handler.ContentTypes()[contentType]; ok { + for _, handler := range protocolHandlers { + if handler.CanHandlePayload(request, contentType) { protocolHandler = handler break } @@ -239,6 +248,7 @@ type handlerConfig struct { HandleGRPC bool HandleGRPCWeb bool RequireConnectProtocolHeader bool + IdempotencyLevel IdempotencyLevel BufferPool *bufferPool ReadMaxBytes int SendMaxBytes int @@ -265,8 +275,9 @@ func newHandlerConfig(procedure string, options []HandlerOption) *handlerConfig func (c *handlerConfig) newSpec(streamType StreamType) Spec { return Spec{ - Procedure: c.Procedure, - StreamType: streamType, + Procedure: c.Procedure, + StreamType: streamType, + IdempotencyLevel: c.IdempotencyLevel, } } @@ -294,6 +305,7 @@ func (c *handlerConfig) newProtocolHandlers(streamType StreamType) []protocolHan ReadMaxBytes: c.ReadMaxBytes, SendMaxBytes: c.SendMaxBytes, RequireConnectProtocolHeader: c.RequireConnectProtocolHeader, + IdempotencyLevel: c.IdempotencyLevel, })) } return handlers @@ -314,6 +326,7 @@ func newStreamHandler( spec: config.newSpec(streamType), implementation: implementation, protocolHandlers: protocolHandlers, + allowMethod: sortedAllowMethodValue(protocolHandlers), acceptPost: sortedAcceptPostValue(protocolHandlers), } } diff --git a/handler_ext_test.go b/handler_ext_test.go index e7b2c0d8..9e5ca1f3 100644 --- a/handler_ext_test.go +++ b/handler_ext_test.go @@ -35,13 +35,14 @@ func TestHandler_ServeHTTP(t *testing.T) { successPingServer{}, )) const pingProcedure = "/" + pingv1connect.PingServiceName + "/Ping" + const sumProcedure = "/" + pingv1connect.PingServiceName + "/Sum" server := httptest.NewServer(mux) client := server.Client() t.Cleanup(func() { server.Close() }) - t.Run("method_not_allowed", func(t *testing.T) { + t.Run("get_method_no_encoding", func(t *testing.T) { t.Parallel() request, err := http.NewRequestWithContext( context.Background(), @@ -53,6 +54,51 @@ func TestHandler_ServeHTTP(t *testing.T) { resp, err := client.Do(request) assert.Nil(t, err) defer resp.Body.Close() + assert.Equal(t, resp.StatusCode, http.StatusUnsupportedMediaType) + }) + + t.Run("get_method_bad_encoding", func(t *testing.T) { + t.Parallel() + request, err := http.NewRequestWithContext( + context.Background(), + http.MethodGet, + server.URL+pingProcedure+`?encoding=unk&message={}`, + strings.NewReader(""), + ) + assert.Nil(t, err) + resp, err := client.Do(request) + assert.Nil(t, err) + defer resp.Body.Close() + assert.Equal(t, resp.StatusCode, http.StatusUnsupportedMediaType) + }) + + t.Run("idempotent_get_method", func(t *testing.T) { + t.Parallel() + request, err := http.NewRequestWithContext( + context.Background(), + http.MethodGet, + server.URL+pingProcedure+`?encoding=json&message={}`, + strings.NewReader(""), + ) + assert.Nil(t, err) + resp, err := client.Do(request) + assert.Nil(t, err) + defer resp.Body.Close() + assert.Equal(t, resp.StatusCode, http.StatusOK) + }) + + t.Run("method_not_allowed", func(t *testing.T) { + t.Parallel() + request, err := http.NewRequestWithContext( + context.Background(), + http.MethodGet, + server.URL+sumProcedure, + strings.NewReader(""), + ) + assert.Nil(t, err) + resp, err := client.Do(request) + assert.Nil(t, err) + defer resp.Body.Close() assert.Equal(t, resp.StatusCode, http.StatusMethodNotAllowed) assert.Equal(t, resp.Header.Get("Allow"), http.MethodPost) }) diff --git a/idempotency_level.go b/idempotency_level.go new file mode 100644 index 00000000..6428d67e --- /dev/null +++ b/idempotency_level.go @@ -0,0 +1,68 @@ +// Copyright 2021-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connect + +import "fmt" + +// An IdempotencyLevel is a value that declares how "idempotent" an RPC is. This +// value can affect RPC behaviors, such as determining whether it is safe to +// retry a request, or what kinds of request modalities are allowed for a given +// procedure. +type IdempotencyLevel int + +// NOTE: For simplicity, these should be kept in sync with the values of the +// google.protobuf.MethodOptions.IdempotencyLevel enumeration. + +const ( + // IdempotencyUnknown is the default idempotency level. A procedure with + // this idempotency level may not be idempotent. This is appropriate for + // any kind of procedure. + IdempotencyUnknown IdempotencyLevel = 0 + + // IdempotencyNoSideEffects is the idempotency level that specifies that a + // given call has no side-effects. This is equivalent to [RFC 9110 § 9.2.1] + // "safe" methods in terms of semantics. This procedure should not mutate + // any state. This idempotency level is appropriate for queries, or anything + // that would be suitable for an HTTP GET request. In addition, due to the + // lack of side-effects, such a procedure would be suitable to retry and + // expect that the results will not be altered by preceding attempts. + // + // [RFC 9110 § 9.2.1]: https://www.rfc-editor.org/rfc/rfc9110.html#section-9.2.1 + IdempotencyNoSideEffects IdempotencyLevel = 1 + + // IdempotencyIdempotent is the idempotency level that specifies that a + // given call is "idempotent", such that multiple instances of the same + // request to this procedure would have the same side-effects as a single + // request. This is equivalent to [RFC 9110 § 9.2.2] "idempotent" methods. + // This level is a subset of the previous level. This idempotency level is + // appropriate for any procedure that is safe to retry multiple times + // and be guaranteed that the response and side-effects will not be altered + // as a result of multiple attempts, for example, entity deletion requests. + // + // [RFC 9110 § 9.2.2]: https://www.rfc-editor.org/rfc/rfc9110.html#section-9.2.2 + IdempotencyIdempotent IdempotencyLevel = 2 +) + +func (i IdempotencyLevel) String() string { + switch i { + case IdempotencyUnknown: + return "idempotency_unknown" + case IdempotencyNoSideEffects: + return "no_side_effects" + case IdempotencyIdempotent: + return "idempotent" + } + return fmt.Sprintf("idempotency_%d", i) +} diff --git a/internal/gen/connect/ping/v1/ping.pb.go b/internal/gen/connect/ping/v1/ping.pb.go index 77b60416..eac68ce1 100644 --- a/internal/gen/connect/ping/v1/ping.pb.go +++ b/internal/gen/connect/ping/v1/ping.pb.go @@ -548,44 +548,44 @@ var file_connect_ping_v1_ping_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0x22, 0x0a, 0x0e, 0x43, 0x75, 0x6d, 0x53, 0x75, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x73, 0x75, 0x6d, - 0x32, 0x84, 0x03, 0x0a, 0x0b, 0x50, 0x69, 0x6e, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x45, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1c, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, + 0x32, 0x87, 0x03, 0x0a, 0x0b, 0x50, 0x69, 0x6e, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x48, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1c, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x04, 0x46, 0x61, 0x69, 0x6c, 0x12, - 0x1c, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x76, - 0x31, 0x2e, 0x46, 0x61, 0x69, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, - 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, - 0x46, 0x61, 0x69, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x44, - 0x0a, 0x03, 0x53, 0x75, 0x6d, 0x12, 0x1b, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, - 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, - 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x28, 0x01, 0x12, 0x50, 0x0a, 0x07, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x55, 0x70, 0x12, - 0x1f, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x76, - 0x31, 0x2e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x55, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x20, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, - 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x55, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x4f, 0x0a, 0x06, 0x43, 0x75, 0x6d, 0x53, 0x75, 0x6d, - 0x12, 0x1e, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, - 0x76, 0x31, 0x2e, 0x43, 0x75, 0x6d, 0x53, 0x75, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1f, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, - 0x76, 0x31, 0x2e, 0x43, 0x75, 0x6d, 0x53, 0x75, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0xc2, 0x01, 0x0a, 0x13, 0x63, 0x6f, 0x6d, 0x2e, - 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x42, - 0x09, 0x50, 0x69, 0x6e, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x42, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x75, 0x66, 0x62, 0x75, 0x69, 0x6c, - 0x64, 0x2f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2d, 0x67, 0x6f, 0x2f, 0x69, 0x6e, 0x74, - 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, - 0x74, 0x2f, 0x70, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x3b, 0x70, 0x69, 0x6e, 0x67, 0x76, 0x31, - 0xa2, 0x02, 0x03, 0x43, 0x50, 0x58, 0xaa, 0x02, 0x0f, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, - 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0f, 0x43, 0x6f, 0x6e, 0x6e, 0x65, - 0x63, 0x74, 0x5c, 0x50, 0x69, 0x6e, 0x67, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x1b, 0x43, 0x6f, 0x6e, - 0x6e, 0x65, 0x63, 0x74, 0x5c, 0x50, 0x69, 0x6e, 0x67, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x11, 0x43, 0x6f, 0x6e, 0x6e, 0x65, - 0x63, 0x74, 0x3a, 0x3a, 0x50, 0x69, 0x6e, 0x67, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x03, 0x90, 0x02, 0x01, 0x12, 0x45, 0x0a, 0x04, 0x46, 0x61, + 0x69, 0x6c, 0x12, 0x1c, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, + 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x61, 0x69, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, + 0x76, 0x31, 0x2e, 0x46, 0x61, 0x69, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x12, 0x44, 0x0a, 0x03, 0x53, 0x75, 0x6d, 0x12, 0x1b, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x6d, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, + 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x12, 0x50, 0x0a, 0x07, 0x43, 0x6f, 0x75, 0x6e, 0x74, + 0x55, 0x70, 0x12, 0x1f, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, + 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x55, 0x70, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, + 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x55, 0x70, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x4f, 0x0a, 0x06, 0x43, 0x75, 0x6d, + 0x53, 0x75, 0x6d, 0x12, 0x1e, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, + 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x75, 0x6d, 0x53, 0x75, 0x6d, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, + 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x75, 0x6d, 0x53, 0x75, 0x6d, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0xc2, 0x01, 0x0a, 0x13, 0x63, + 0x6f, 0x6d, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x70, 0x69, 0x6e, 0x67, 0x2e, + 0x76, 0x31, 0x42, 0x09, 0x50, 0x69, 0x6e, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, + 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x75, 0x66, 0x62, + 0x75, 0x69, 0x6c, 0x64, 0x2f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2d, 0x67, 0x6f, 0x2f, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x2f, 0x70, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x3b, 0x70, 0x69, 0x6e, + 0x67, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x43, 0x50, 0x58, 0xaa, 0x02, 0x0f, 0x43, 0x6f, 0x6e, 0x6e, + 0x65, 0x63, 0x74, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0f, 0x43, 0x6f, + 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x5c, 0x50, 0x69, 0x6e, 0x67, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x1b, + 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x5c, 0x50, 0x69, 0x6e, 0x67, 0x5c, 0x56, 0x31, 0x5c, + 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x11, 0x43, 0x6f, + 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x3a, 0x3a, 0x50, 0x69, 0x6e, 0x67, 0x3a, 0x3a, 0x56, 0x31, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/gen/connect/ping/v1/pingv1connect/ping.connect.go b/internal/gen/connect/ping/v1/pingv1connect/ping.connect.go index 617bbbc8..4825c316 100644 --- a/internal/gen/connect/ping/v1/pingv1connect/ping.connect.go +++ b/internal/gen/connect/ping/v1/pingv1connect/ping.connect.go @@ -37,7 +37,7 @@ import ( // generated with a version of connect newer than the one compiled into your binary. You can fix the // problem by either regenerating this code with an older version of connect or updating the connect // version compiled into your binary. -const _ = connect_go.IsAtLeastVersion0_1_0 +const _ = connect_go.IsAtLeastVersion1_6_0 const ( // PingServiceName is the fully-qualified name of the PingService service. @@ -91,7 +91,8 @@ func NewPingServiceClient(httpClient connect_go.HTTPClient, baseURL string, opts ping: connect_go.NewClient[v1.PingRequest, v1.PingResponse]( httpClient, baseURL+PingServicePingProcedure, - opts..., + connect_go.WithIdempotency(connect_go.IdempotencyNoSideEffects), + connect_go.WithClientOptions(opts...), ), fail: connect_go.NewClient[v1.FailRequest, v1.FailResponse]( httpClient, @@ -174,7 +175,8 @@ func NewPingServiceHandler(svc PingServiceHandler, opts ...connect_go.HandlerOpt mux.Handle(PingServicePingProcedure, connect_go.NewUnaryHandler( PingServicePingProcedure, svc.Ping, - opts..., + connect_go.WithIdempotency(connect_go.IdempotencyNoSideEffects), + connect_go.WithHandlerOptions(opts...), )) mux.Handle(PingServiceFailProcedure, connect_go.NewUnaryHandler( PingServiceFailProcedure, diff --git a/internal/proto/connect/ping/v1/ping.proto b/internal/proto/connect/ping/v1/ping.proto index b113fff7..fd8669d0 100644 --- a/internal/proto/connect/ping/v1/ping.proto +++ b/internal/proto/connect/ping/v1/ping.proto @@ -62,7 +62,9 @@ message CumSumResponse { service PingService { // Ping sends a ping to the server to determine if it's reachable. - rpc Ping(PingRequest) returns (PingResponse) {} + rpc Ping(PingRequest) returns (PingResponse) { + option idempotency_level = NO_SIDE_EFFECTS; + } // Fail always fails. rpc Fail(FailRequest) returns (FailResponse) {} // Sum calculates the sum of the numbers sent on the stream. diff --git a/option.go b/option.go index a997b71a..427bc8d2 100644 --- a/option.go +++ b/option.go @@ -230,6 +230,36 @@ func WithSendMaxBytes(max int) Option { return &sendMaxBytesOption{Max: max} } +// WithIdempotency declares the idempotency of the procedure. This can determine +// whether a procedure call can safely be retried, and may affect which request +// modalities are allowed for a given procedure call. +// +// In most cases, you should not need to manually set this. It is normally set +// by the code generator for your schema. For protobuf schemas, it can be set like this: +// +// rpc Ping(PingRequest) returns (PingResponse) { +// option idempotency_level = NO_SIDE_EFFECTS; +// } +func WithIdempotency(idempotencyLevel IdempotencyLevel) Option { + return &idempotencyOption{idempotencyLevel: idempotencyLevel} +} + +// WithHTTPGet allows Connect-protocol clients to use HTTP GET requests for +// side-effect free unary RPC calls. Typically, the service schema indicates +// which procedures are idempotent (see [WithIdempotency] for an example +// protobuf schema). The gRPC and gRPC-Web protocols are POST-only, so this +// option has no effect when combined with [WithGRPC] or [WithGRPCWeb]. +// +// Using HTTP GET requests makes it easier to take advantage of CDNs, caching +// reverse proxies, and browsers' built-in caching. Note, however, that servers +// don't automatically set any cache headers; you can set cache headers using +// interceptors or by adding headers in individual procedure implementations. +// +// By default, all requests are made as HTTP POSTs. +func WithHTTPGet() ClientOption { + return &enableGet{} +} + // WithInterceptors configures a client or handler's interceptor stack. Repeated // WithInterceptors options are applied in order, so // @@ -399,6 +429,18 @@ func (o *requireConnectProtocolHeaderOption) applyToHandler(config *handlerConfi config.RequireConnectProtocolHeader = true } +type idempotencyOption struct { + idempotencyLevel IdempotencyLevel +} + +func (o *idempotencyOption) applyToClient(config *clientConfig) { + config.IdempotencyLevel = o.idempotencyLevel +} + +func (o *idempotencyOption) applyToHandler(config *handlerConfig) { + config.IdempotencyLevel = o.idempotencyLevel +} + type grpcOption struct { web bool } @@ -407,6 +449,44 @@ func (o *grpcOption) applyToClient(config *clientConfig) { config.Protocol = &protocolGRPC{web: o.web} } +type enableGet struct{} + +func (o *enableGet) applyToClient(config *clientConfig) { + config.EnableGet = true +} + +// withHTTPGetMaxURLSize sets the maximum allowable URL length for GET requests +// made using the Connect protocol. It has no effect on gRPC or gRPC-Web +// clients, since those protocols are POST-only. +// +// Limiting the URL size is useful as most user agents, proxies, and servers +// have limits on the allowable length of a URL. For example, Apache and Nginx +// limit the size of a request line to around 8 KiB, meaning that maximum +// length of a URL is a bit smaller than this. If you run into URL size +// limitations imposed by your network infrastructure and don't know the +// maximum allowable size, or if you'd prefer to be cautious from the start, a +// 4096 byte (4 KiB) limit works with most common proxies and CDNs. +// +// If fallback is set to true and the URL would be longer than the configured +// maximum value, the request will be sent as an HTTP POST instead. If fallback +// is set to false, the request will fail with [CodeResourceExhausted]. +// +// By default, Connect-protocol clients with GET requests enabled may send a +// URL of any size. +func withHTTPGetMaxURLSize(bytes int, fallback bool) ClientOption { + return &getURLMaxBytes{Max: bytes, Fallback: fallback} +} + +type getURLMaxBytes struct { + Max int + Fallback bool +} + +func (o *getURLMaxBytes) applyToClient(config *clientConfig) { + config.GetURLMaxBytes = o.Max + config.GetUseFallback = o.Fallback +} + type interceptorsOption struct { Interceptors []Interceptor } diff --git a/protocol.go b/protocol.go index 3e7b1943..77e899ed 100644 --- a/protocol.go +++ b/protocol.go @@ -78,11 +78,15 @@ type protocolHandlerParams struct { ReadMaxBytes int SendMaxBytes int RequireConnectProtocolHeader bool + IdempotencyLevel IdempotencyLevel } // Handler is the server side of a protocol. HTTP handlers typically support // multiple protocols, codecs, and compressors. type protocolHandler interface { + // Methods is the set of HTTP methods the protocol can handle. + Methods() map[string]struct{} + // ContentTypes is the set of HTTP Content-Types that the protocol can // handle. ContentTypes() map[string]struct{} @@ -95,6 +99,11 @@ type protocolHandler interface { // request's context, a nil cancellation function, and a nil error. SetTimeout(*http.Request) (context.Context, context.CancelFunc, error) + // CanHandlePayload returns true if the protocol can handle an HTTP request. + // This is called after the request method is validated, so we only need to + // be concerned with the content type/payload specifically. + CanHandlePayload(*http.Request, string) bool + // NewConn constructs a HandlerConn for the message exchange. NewConn(http.ResponseWriter, *http.Request) (handlerConnCloser, bool) } @@ -113,6 +122,9 @@ type protocolClientParams struct { BufferPool *bufferPool ReadMaxBytes int SendMaxBytes int + EnableGet bool + GetURLMaxBytes int + GetUseFallback bool // The gRPC family of protocols always needs access to a Protobuf codec to // marshal and unmarshal errors. Protobuf Codec @@ -222,6 +234,21 @@ func sortedAcceptPostValue(handlers []protocolHandler) string { return strings.Join(accept, ", ") } +func sortedAllowMethodValue(handlers []protocolHandler) string { + methods := make(map[string]struct{}) + for _, handler := range handlers { + for method := range handler.Methods() { + methods[method] = struct{}{} + } + } + allow := make([]string, 0, len(methods)) + for ct := range methods { + allow = append(allow, ct) + } + sort.Strings(allow) + return strings.Join(allow, ", ") +} + func isCommaOrSpace(c rune) bool { return c == ',' || c == ' ' } diff --git a/protocol_connect.go b/protocol_connect.go index 86da7c6e..6b935c51 100644 --- a/protocol_connect.go +++ b/protocol_connect.go @@ -24,6 +24,7 @@ import ( "io" "math" "net/http" + "net/url" "runtime" "strconv" "strings" @@ -41,12 +42,20 @@ const ( connectHeaderTimeout = "Connect-Timeout-Ms" connectHeaderProtocolVersion = "Connect-Protocol-Version" connectProtocolVersion = "1" + headerVary = "Vary" connectFlagEnvelopeEndStream = 0b00000010 connectUnaryContentTypePrefix = "application/" connectUnaryContentTypeJSON = connectUnaryContentTypePrefix + "json" connectStreamingContentTypePrefix = "application/connect+" + + connectUnaryEncodingQueryParameter = "encoding" + connectUnaryMessageQueryParameter = "message" + connectUnaryBase64QueryParameter = "base64" + connectUnaryCompressionQueryParameter = "compression" + connectUnaryConnectQueryParameter = "connect" + connectUnaryConnectQueryValue = "v" + connectProtocolVersion ) // defaultConnectUserAgent returns a User-Agent string similar to those used in gRPC. @@ -58,6 +67,13 @@ type protocolConnect struct{} // NewHandler implements protocol, so it must return an interface. func (*protocolConnect) NewHandler(params *protocolHandlerParams) protocolHandler { + methods := make(map[string]struct{}) + methods[http.MethodPost] = struct{}{} + + if params.Spec.StreamType == StreamTypeUnary && params.IdempotencyLevel == IdempotencyNoSideEffects { + methods[http.MethodGet] = struct{}{} + } + contentTypes := make(map[string]struct{}) for _, name := range params.Codecs.Names() { if params.Spec.StreamType == StreamTypeUnary { @@ -66,8 +82,10 @@ func (*protocolConnect) NewHandler(params *protocolHandlerParams) protocolHandle } contentTypes[canonicalizeContentType(connectStreamingContentTypePrefix+name)] = struct{}{} } + return &connectHandler{ protocolHandlerParams: *params, + methods: methods, accept: contentTypes, } } @@ -83,7 +101,12 @@ func (*protocolConnect) NewClient(params *protocolClientParams) (protocolClient, type connectHandler struct { protocolHandlerParams - accept map[string]struct{} + methods map[string]struct{} + accept map[string]struct{} +} + +func (h *connectHandler) Methods() map[string]struct{} { + return h.methods } func (h *connectHandler) ContentTypes() map[string]struct{} { @@ -109,15 +132,33 @@ func (*connectHandler) SetTimeout(request *http.Request) (context.Context, conte return ctx, cancel, nil } +func (h *connectHandler) CanHandlePayload(request *http.Request, contentType string) bool { + if request.Method == http.MethodGet { + query := request.URL.Query() + codecName := query.Get(connectUnaryEncodingQueryParameter) + contentType = connectContentTypeFromCodecName( + h.Spec.StreamType, + codecName, + ) + } + _, ok := h.accept[contentType] + return ok +} + func (h *connectHandler) NewConn( responseWriter http.ResponseWriter, request *http.Request, ) (handlerConnCloser, bool) { + query := request.URL.Query() // We need to parse metadata before entering the interceptor stack; we'll // send the error to the client later on. var contentEncoding, acceptEncoding string if h.Spec.StreamType == StreamTypeUnary { - contentEncoding = getHeaderCanonical(request.Header, connectUnaryHeaderCompression) + if request.Method == http.MethodGet { + contentEncoding = query.Get(connectUnaryCompressionQueryParameter) + } else { + contentEncoding = getHeaderCanonical(request.Header, connectUnaryHeaderCompression) + } acceptEncoding = getHeaderCanonical(request.Header, connectUnaryHeaderAcceptCompression) } else { contentEncoding = getHeaderCanonical(request.Header, connectStreamingHeaderCompression) @@ -131,7 +172,15 @@ func (h *connectHandler) NewConn( if failed == nil { failed = checkServerStreamsCanFlush(h.Spec, responseWriter) } - if failed == nil { + if failed == nil && request.Method == http.MethodGet { + version := query.Get(connectUnaryConnectQueryParameter) + if version == "" && h.RequireConnectProtocolHeader { + failed = errorf(CodeInvalidArgument, "missing required query parameter: set %s to %q", connectUnaryConnectQueryParameter, connectUnaryConnectQueryValue) + } else if version != "" && version != connectUnaryConnectQueryValue { + failed = errorf(CodeInvalidArgument, "%s must be %q: got %q", connectUnaryConnectQueryParameter, connectUnaryConnectQueryValue, version) + } + } + if failed == nil && request.Method == http.MethodPost { version := getHeaderCanonical(request.Header, connectHeaderProtocolVersion) if version == "" && h.RequireConnectProtocolHeader { failed = errorf(CodeInvalidArgument, "missing required header: set %s to %q", connectHeaderProtocolVersion, connectProtocolVersion) @@ -140,6 +189,38 @@ func (h *connectHandler) NewConn( } } + var requestBody io.ReadCloser + var contentType, codecName string + if request.Method == http.MethodGet { + if failed == nil && !query.Has(connectUnaryEncodingQueryParameter) { + failed = errorf(CodeInvalidArgument, "missing %s parameter", connectUnaryEncodingQueryParameter) + } else if failed == nil && !query.Has(connectUnaryMessageQueryParameter) { + failed = errorf(CodeInvalidArgument, "missing %s parameter", connectUnaryMessageQueryParameter) + } + msg := query.Get(connectUnaryMessageQueryParameter) + msgReader := queryValueReader(msg, query.Get(connectUnaryBase64QueryParameter) == "1") + requestBody = io.NopCloser(msgReader) + codecName = query.Get(connectUnaryEncodingQueryParameter) + contentType = connectContentTypeFromCodecName( + h.Spec.StreamType, + codecName, + ) + } else { + requestBody = request.Body + contentType = getHeaderCanonical(request.Header, headerContentType) + codecName = connectCodecFromContentType( + h.Spec.StreamType, + contentType, + ) + } + + codec := h.Codecs.Get(codecName) + // The codec can be nil in the GET request case; that's okay: when failed + // is non-nil, codec is never used. + if failed == nil && codec == nil { + failed = errorf(CodeInvalidArgument, "invalid message encoding: %q", codecName) + } + // Write any remaining headers here: // (1) any writes to the stream will implicitly send the headers, so we // should get all of gRPC's required response headers ready. @@ -148,7 +229,7 @@ func (h *connectHandler) NewConn( // Since we know that these header keys are already in canonical form, we can // skip the normalization in Header.Set. header := responseWriter.Header() - header[headerContentType] = []string{getHeaderCanonical(request.Header, headerContentType)} + header[headerContentType] = []string{contentType} acceptCompressionHeader := connectUnaryHeaderAcceptCompression if h.Spec.StreamType != StreamTypeUnary { acceptCompressionHeader = connectStreamingHeaderAcceptCompression @@ -162,16 +243,11 @@ func (h *connectHandler) NewConn( } header[acceptCompressionHeader] = []string{h.CompressionPools.CommaSeparatedNames()} - codecName := connectCodecFromContentType( - h.Spec.StreamType, - getHeaderCanonical(request.Header, headerContentType), - ) - codec := h.Codecs.Get(codecName) // handler.go guarantees this is not nil - var conn handlerConnCloser peer := Peer{ Addr: request.RemoteAddr, Protocol: ProtocolConnect, + Query: query, } if h.Spec.StreamType == StreamTypeUnary { conn = &connectUnaryHandlerConn{ @@ -190,7 +266,7 @@ func (h *connectHandler) NewConn( sendMaxBytes: h.SendMaxBytes, }, unmarshaler: connectUnaryUnmarshaler{ - reader: request.Body, + reader: requestBody, codec: codec, compressionPool: h.CompressionPools.Get(requestCompression), bufferPool: h.BufferPool, @@ -216,7 +292,7 @@ func (h *connectHandler) NewConn( }, unmarshaler: connectStreamingUnmarshaler{ envelopeReader: envelopeReader{ - reader: request.Body, + reader: requestBody, codec: codec, compressionPool: h.CompressionPools.Get(requestCompression), bufferPool: h.BufferPool, @@ -299,15 +375,17 @@ func (c *connectClient) NewConn( duplexCall: duplexCall, compressionPools: c.CompressionPools, bufferPool: c.BufferPool, - marshaler: connectUnaryMarshaler{ - writer: duplexCall, - codec: c.Codec, - compressMinBytes: c.CompressMinBytes, - compressionName: c.CompressionName, - compressionPool: c.CompressionPools.Get(c.CompressionName), - bufferPool: c.BufferPool, - header: duplexCall.Header(), - sendMaxBytes: c.SendMaxBytes, + marshaler: connectUnaryRequestMarshaler{ + connectUnaryMarshaler: connectUnaryMarshaler{ + writer: duplexCall, + codec: c.Codec, + compressMinBytes: c.CompressMinBytes, + compressionName: c.CompressionName, + compressionPool: c.CompressionPools.Get(c.CompressionName), + bufferPool: c.BufferPool, + header: duplexCall.Header(), + sendMaxBytes: c.SendMaxBytes, + }, }, unmarshaler: connectUnaryUnmarshaler{ reader: duplexCall, @@ -318,6 +396,15 @@ func (c *connectClient) NewConn( responseHeader: make(http.Header), responseTrailer: make(http.Header), } + if spec.IdempotencyLevel == IdempotencyNoSideEffects { + unaryConn.marshaler.enableGet = c.EnableGet + unaryConn.marshaler.getURLMaxBytes = c.GetURLMaxBytes + unaryConn.marshaler.getUseFallback = c.GetUseFallback + unaryConn.marshaler.duplexCall = duplexCall + if stableCodec, ok := c.Codec.(stableCodec); ok { + unaryConn.marshaler.stableCodec = stableCodec + } + } conn = unaryConn duplexCall.SetValidateResponse(unaryConn.validateResponse) } else { @@ -361,7 +448,7 @@ type connectUnaryClientConn struct { duplexCall *duplexHTTPCall compressionPools readOnlyCompressionPools bufferPool *bufferPool - marshaler connectUnaryMarshaler + marshaler connectUnaryRequestMarshaler unmarshaler connectUnaryUnmarshaler responseHeader http.Header responseTrailer http.Header @@ -623,6 +710,12 @@ func (hc *connectUnaryHandlerConn) Close(err error) error { func (hc *connectUnaryHandlerConn) writeResponseHeader(err error) { header := hc.responseWriter.Header() + if hc.request.Method == http.MethodGet { + // The response content varies depending on the compression that the client + // requested (if any). GETs are potentially cacheable, so we should ensure + // that the Vary header includes at least Accept-Encoding (and not overwrite any values already set). + header[headerVary] = append(header[headerVary], connectUnaryHeaderAcceptCompression) + } if err != nil { if connectErr, ok := asError(err); ok { mergeHeaders(header, connectErr.meta) @@ -811,6 +904,110 @@ func (m *connectUnaryMarshaler) write(data []byte) *Error { return nil } +type connectUnaryRequestMarshaler struct { + connectUnaryMarshaler + + enableGet bool + getURLMaxBytes int + getUseFallback bool + stableCodec stableCodec + duplexCall *duplexHTTPCall +} + +func (m *connectUnaryRequestMarshaler) Marshal(message any) *Error { + if m.enableGet { + if m.stableCodec == nil && !m.getUseFallback { + return errorf(CodeInternal, "codec %s doesn't support stable marshal; cam't use get", m.codec.Name()) + } + if m.stableCodec != nil { + return m.marshalWithGet(message) + } + } + return m.connectUnaryMarshaler.Marshal(message) +} + +func (m *connectUnaryRequestMarshaler) marshalWithGet(message any) *Error { + // TODO(jchadwick-buf): This function is mostly a superset of + // connectUnaryMarshaler.Marshal. This should be reconciled at some point. + var data []byte + var err error + if message != nil { + data, err = m.stableCodec.MarshalStable(message) + if err != nil { + return errorf(CodeInternal, "marshal message stable: %w", err) + } + } + isTooBig := m.sendMaxBytes > 0 && len(data) > m.sendMaxBytes + if isTooBig && m.compressionPool == nil { + return NewError(CodeResourceExhausted, fmt.Errorf( + "message size %d exceeds sendMaxBytes %d: enabling request compression may help", + len(data), + m.sendMaxBytes, + )) + } + if !isTooBig { + url := m.buildGetURL(data, false /* compressed */) + if m.getURLMaxBytes <= 0 || len(url.String()) < m.getURLMaxBytes { + return m.writeWithGet(url) + } + if m.compressionPool == nil { + if m.getUseFallback { + return m.write(data) + } + return NewError(CodeResourceExhausted, fmt.Errorf( + "url size %d exceeds getURLMaxBytes %d: enabling request compression may help", + len(url.String()), + m.getURLMaxBytes, + )) + } + } + // Compress message to try to make it fit in the URL. + uncompressed := bytes.NewBuffer(data) + defer m.bufferPool.Put(uncompressed) + compressed := m.bufferPool.Get() + defer m.bufferPool.Put(compressed) + if err := m.compressionPool.Compress(compressed, uncompressed); err != nil { + return err + } + if m.sendMaxBytes > 0 && compressed.Len() > m.sendMaxBytes { + return NewError(CodeResourceExhausted, fmt.Errorf("compressed message size %d exceeds sendMaxBytes %d", compressed.Len(), m.sendMaxBytes)) + } + url := m.buildGetURL(compressed.Bytes(), true /* compressed */) + if m.getURLMaxBytes <= 0 || len(url.String()) < m.getURLMaxBytes { + return m.writeWithGet(url) + } + if m.getUseFallback { + setHeaderCanonical(m.header, connectUnaryHeaderCompression, m.compressionName) + return m.write(compressed.Bytes()) + } + return NewError(CodeResourceExhausted, fmt.Errorf("compressed url size %d exceeds getURLMaxBytes %d", len(url.String()), m.getURLMaxBytes)) +} + +func (m *connectUnaryRequestMarshaler) buildGetURL(data []byte, compressed bool) *url.URL { + url := *m.duplexCall.URL() + query := url.Query() + query.Set(connectUnaryConnectQueryParameter, connectUnaryConnectQueryValue) + query.Set(connectUnaryEncodingQueryParameter, m.codec.Name()) + if m.stableCodec.IsBinary() || compressed { + query.Set(connectUnaryMessageQueryParameter, encodeBinaryQueryValue(data)) + query.Set(connectUnaryBase64QueryParameter, "1") + } else { + query.Set(connectUnaryMessageQueryParameter, string(data)) + } + if compressed { + query.Set(connectUnaryCompressionQueryParameter, m.compressionName) + } + url.RawQuery = query.Encode() + return &url +} + +func (m *connectUnaryRequestMarshaler) writeWithGet(url *url.URL) *Error { + delete(m.header, connectHeaderProtocolVersion) + m.duplexCall.SetMethod(http.MethodGet) + *m.duplexCall.URL() = *url + return nil +} + type connectUnaryUnmarshaler struct { reader io.Reader codec Codec @@ -1048,3 +1245,29 @@ func connectContentTypeFromCodecName(streamType StreamType, name string) string } return connectStreamingContentTypePrefix + name } + +// encodeBinaryQueryValue URL-safe base64-encodes data, without padding. +func encodeBinaryQueryValue(data []byte) string { + return base64.RawURLEncoding.EncodeToString(data) +} + +// binaryQueryValueReader creates a reader that can read either padded or +// unpadded URL-safe base64 from a string. +func binaryQueryValueReader(data string) io.Reader { + stringReader := strings.NewReader(data) + if len(data)%4 != 0 { + // Data definitely isn't padded. + return base64.NewDecoder(base64.RawURLEncoding, stringReader) + } + // Data is padded, or no padding was necessary. + return base64.NewDecoder(base64.URLEncoding, stringReader) +} + +// queryValueReader creates a reader for a string that may be URL-safe base64 +// encoded. +func queryValueReader(data string, base64Encoded bool) io.Reader { + if base64Encoded { + return binaryQueryValueReader(data) + } + return strings.NewReader(data) +} diff --git a/protocol_grpc.go b/protocol_grpc.go index 7188ff25..df188702 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -63,7 +63,10 @@ var ( {time.Minute, 'M'}, {time.Hour, 'H'}, } - grpcTimeoutUnitLookup = make(map[byte]time.Duration) + grpcTimeoutUnitLookup = make(map[byte]time.Duration) + grpcAllowedMethods = map[string]struct{}{ + http.MethodPost: {}, + } errTrailersWithoutGRPCStatus = fmt.Errorf("gRPC protocol error: no %s trailer", grpcHeaderStatus) // defaultGrpcUserAgent follows @@ -128,6 +131,10 @@ type grpcHandler struct { accept map[string]struct{} } +func (g *grpcHandler) Methods() map[string]struct{} { + return grpcAllowedMethods +} + func (g *grpcHandler) ContentTypes() map[string]struct{} { return g.accept } @@ -146,6 +153,11 @@ func (*grpcHandler) SetTimeout(request *http.Request) (context.Context, context. return ctx, cancel, nil } +func (g *grpcHandler) CanHandlePayload(request *http.Request, contentType string) bool { + _, ok := g.accept[contentType] + return ok +} + func (g *grpcHandler) NewConn( responseWriter http.ResponseWriter, request *http.Request,