diff --git a/internal/app/connectconformance/testsuites/data/connect_client_compressed_error_endstream.yaml b/internal/app/connectconformance/testsuites/data/connect_client_compressed_error_endstream.yaml new file mode 100644 index 00000000..72c1f9e2 --- /dev/null +++ b/internal/app/connectconformance/testsuites/data/connect_client_compressed_error_endstream.yaml @@ -0,0 +1,93 @@ +name: Connect Compressed Error and End-Stream +mode: TEST_MODE_CLIENT +relevantProtocols: + - PROTOCOL_CONNECT +relevantCompressions: + # Ideally, we'd run this sort of test for any/all compression encodings + # supported by the client. But, since it uses a raw HTTP response, we + # have to hard-code the compression (raw response config doesn't allow + # parameterizing the encoding). So we test with the most likely encoding + # to be implemented. + - COMPRESSION_GZIP +relevantCodecs: + - CODEC_PROTO +testCases: + - request: + testName: error/compressed + service: connectrpc.conformance.v1.ConformanceService + method: Unary + streamType: STREAM_TYPE_UNARY + requestMessages: + - "@type": type.googleapis.com/connectrpc.conformance.v1.UnaryRequest + responseDefinition: + rawResponse: + statusCode: 422 + headers: + - name: content-type + value: [ "application/json" ] + - name: content-encoding + value: [ "gzip" ] + unary: + text: | + { + "code": "out_of_range", + "message": "oops", + "details": [ + { + "type": "google.protobuf.FileDescriptorProto", + "value": "Cgp0ZXN0LnByb3Rv", + "debug": { "name": "test.proto" } + } + ] + } + compression: COMPRESSION_GZIP + expectedResponse: + error: + code: CODE_OUT_OF_RANGE + message: oops + details: + - "@type": type.googleapis.com/google.protobuf.FileDescriptorProto + name: "test.proto" + + - request: + testName: end-stream/compressed + service: connectrpc.conformance.v1.ConformanceService + method: ServerStream + streamType: STREAM_TYPE_SERVER_STREAM + requestMessages: + - "@type": type.googleapis.com/connectrpc.conformance.v1.ServerStreamRequest + responseDefinition: + rawResponse: + statusCode: 200 + headers: + - name: content-type + value: [ "application/connect+proto" ] + - name: connect-content-encoding + value: [ "gzip" ] + stream: + items: + - flags: 3 + payload: + text: | + { + "error": { + "code": "out_of_range", + "message": "oops", + "foobar": "baz", + "details": [ + { + "type": "google.protobuf.FileDescriptorProto", + "value": "Cgp0ZXN0LnByb3Rv", + "debug": { "name": "test.proto" } + } + ] + } + } + compression: COMPRESSION_GZIP + expectedResponse: + error: + code: CODE_OUT_OF_RANGE + message: oops + details: + - "@type": type.googleapis.com/google.protobuf.FileDescriptorProto + name: "test.proto" diff --git a/internal/app/connectconformance/testsuites/data/grpc_web_client_compressed_trailers.yaml b/internal/app/connectconformance/testsuites/data/grpc_web_client_compressed_trailers.yaml new file mode 100644 index 00000000..0945da85 --- /dev/null +++ b/internal/app/connectconformance/testsuites/data/grpc_web_client_compressed_trailers.yaml @@ -0,0 +1,49 @@ +name: gRPC-Web Compressed Trailers +mode: TEST_MODE_CLIENT +relevantProtocols: + - PROTOCOL_GRPC_WEB +relevantCompressions: + # Ideally, we'd run this sort of test for any/all compression encodings + # supported by the client. But, since it uses a raw HTTP response, we + # have to hard-code the compression (raw response config doesn't allow + # parameterizing the encoding). So we test with the most likely encoding + # to be implemented. + - COMPRESSION_GZIP +relevantCodecs: + - CODEC_PROTO +testCases: + - request: + testName: trailers-in-body/compressed + streamType: STREAM_TYPE_UNARY + requestMessages: + - "@type": type.googleapis.com/connectrpc.conformance.v1.UnaryRequest + responseDefinition: + rawResponse: + status_code: 200 + headers: + - name: "access-control-allow-origin" + value: [ "*" ] + - name: "access-control-expose-headers" + value: [ "*" ] + - name: content-type + value: [ "application/grpc-web" ] + - name: grpc-encoding + value: [ "gzip" ] + - name: x-custom-header + value: [ "foo" ] + stream: + items: + - flags: 129 + payload: + text: "grpc-status: 9\r\ngrpc-message: error\r\nx-custom-trailer: bing\r\n" + compression: COMPRESSION_GZIP + expectedResponse: + responseHeaders: + - name: x-custom-header + value: [ "foo" ] + error: + code: 9 + message: error + responseTrailers: + - name: x-custom-trailer + value: [ "bing" ] \ No newline at end of file diff --git a/internal/app/connectconformance/testsuites/data/grpc_web_client_trailers.yaml b/internal/app/connectconformance/testsuites/data/grpc_web_client_trailers.yaml index 3bc8d164..ca0c4897 100644 --- a/internal/app/connectconformance/testsuites/data/grpc_web_client_trailers.yaml +++ b/internal/app/connectconformance/testsuites/data/grpc_web_client_trailers.yaml @@ -4,8 +4,6 @@ relevantProtocols: - PROTOCOL_GRPC_WEB relevantCodecs: - CODEC_PROTO -relevantCompressions: - - COMPRESSION_IDENTITY # These tests verify that a gRPC-Web client can handle trailers in the body with # no response, trailers-only responses (trailers in headers), and trailers with # different cases (in addition to the "standard" all lower-case). diff --git a/internal/app/referenceclient/wire_details.go b/internal/app/referenceclient/wire_details.go index 79a9fbc7..00881f41 100644 --- a/internal/app/referenceclient/wire_details.go +++ b/internal/app/referenceclient/wire_details.go @@ -173,7 +173,12 @@ func examineWireDetails(ctx context.Context, printer internal.Printer) (statusCo case isUnaryJSONError(contentType, trace.Response.StatusCode): // If this is a unary request that returned an error, then use the entire // response body as the wire error details. - examineConnectError(wrapper.buf.Bytes(), printer) + decomp := tracer.GetDecompressor(trace.Response.Header.Get("content-encoding")) + if err := decomp.Reset(wrapper.buf); err == nil { + if body, err := io.ReadAll(decomp); err == nil { + examineConnectError(body, printer) + } + } case strings.HasPrefix(contentType, "application/connect+"): // If this is a streaming Connect request, then look through the trace events // for the ResponseBodyEndStream event and parse its content into an @@ -314,6 +319,7 @@ func examineConnectErrorDetail(i int, detailJSON json.RawMessage, printer intern decoded, err := base64.RawStdEncoding.DecodeString(str) if err != nil { printer.Printf(`%s: value for key "value", %q, is not valid unpadded base64-encoding: %v`, prefix, val, err) + detail.Value = nil // this will skip the comparison of "value" and "debug" info below break } decodedVal = decoded diff --git a/internal/app/referenceclient/wire_details_test.go b/internal/app/referenceclient/wire_details_test.go index 9ce15e36..8db6d143 100644 --- a/internal/app/referenceclient/wire_details_test.go +++ b/internal/app/referenceclient/wire_details_test.go @@ -15,6 +15,8 @@ package referenceclient import ( + "bytes" + "compress/gzip" "context" "encoding/binary" "encoding/json" @@ -36,6 +38,7 @@ func TestExamineConnectError(t *testing.T) { t.Parallel() testCases := []struct { name string + compressed bool endStream string expectedFeedback []string }{ @@ -67,6 +70,15 @@ func TestExamineConnectError(t *testing.T) { ] }`, }, + { + name: "compressed", + compressed: true, + endStream: ` + { + "code": "internal", + "message": "blah blah blah" + }`, + }, { name: "complete", endStream: ` @@ -287,8 +299,17 @@ func TestExamineConnectError(t *testing.T) { t.Parallel() svr := httptest.NewServer(http.HandlerFunc(func(respWriter http.ResponseWriter, req *http.Request) { respWriter.Header().Set("Content-Type", "application/json") + if testCase.compressed { + respWriter.Header().Set("Content-Encoding", "gzip") + } respWriter.WriteHeader(http.StatusBadRequest) - _, _ = respWriter.Write([]byte(testCase.endStream)) + if testCase.compressed { + w := gzip.NewWriter(respWriter) + _, _ = w.Write([]byte(testCase.endStream)) + _ = w.Close() + } else { + _, _ = respWriter.Write([]byte(testCase.endStream)) + } })) t.Cleanup(svr.Close) client := conformancev1connect.NewConformanceServiceClient( @@ -326,6 +347,7 @@ func TestExamineConnectEndStream(t *testing.T) { t.Parallel() testCases := []struct { name string + compressed bool endStream string expectedFeedback []string }{ @@ -344,6 +366,17 @@ func TestExamineConnectEndStream(t *testing.T) { endStream: ` {"error":{"code":"canceled"}}`, }, + { + name: "compressed", + compressed: true, + endStream: ` + { + "error": {"code": "invalid_argument", "message": "foobar"}, + "metadata":{ + "foo": ["bar", "baz"] + } + }`, + }, { name: "nulls", endStream: ` @@ -426,11 +459,25 @@ func TestExamineConnectEndStream(t *testing.T) { t.Parallel() svr := httptest.NewServer(http.HandlerFunc(func(respWriter http.ResponseWriter, req *http.Request) { respWriter.Header().Set("Content-Type", "application/connect+proto") - _, _ = respWriter.Write([]byte{2}) // just the end-stream flag + if testCase.compressed { + respWriter.Header().Set("Connect-Content-Encoding", "gzip") + _, _ = respWriter.Write([]byte{3}) // end-stream + compressed flags + } else { + _, _ = respWriter.Write([]byte{2}) // just the end-stream flag + } + + data := []byte(testCase.endStream) + if testCase.compressed { + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + _, _ = w.Write([]byte(testCase.endStream)) + _ = w.Close() + data = buf.Bytes() + } var size [4]byte - binary.BigEndian.PutUint32(size[:], uint32(len(testCase.endStream))) + binary.BigEndian.PutUint32(size[:], uint32(len(data))) _, _ = respWriter.Write(size[:]) - _, _ = respWriter.Write([]byte(testCase.endStream)) + _, _ = respWriter.Write(data) })) t.Cleanup(svr.Close) client := conformancev1connect.NewConformanceServiceClient( @@ -468,6 +515,7 @@ func TestExamineGRPCEndStream(t *testing.T) { t.Parallel() testCases := []struct { name string + compressed bool endStream string expectedFeedback []string }{ @@ -477,6 +525,13 @@ func TestExamineGRPCEndStream(t *testing.T) { "grpc-message: foo\r\n" + "blah-blah: foobar\r\n", }, + { + name: "compressed", + compressed: true, + endStream: "grpc-status: 6\r\n" + + "grpc-message: foo\r\n" + + "blah-blah: foobar\r\n", + }, { name: "allowed special chars in key", endStream: "grpc-status: 6\r\n" + @@ -584,11 +639,24 @@ func TestExamineGRPCEndStream(t *testing.T) { t.Parallel() svr := httptest.NewServer(http.HandlerFunc(func(respWriter http.ResponseWriter, req *http.Request) { respWriter.Header().Set("Content-Type", "application/grpc-web") - _, _ = respWriter.Write([]byte{128}) // just the end-stream flag + if testCase.compressed { + respWriter.Header().Set("Grpc-Encoding", "gzip") + _, _ = respWriter.Write([]byte{129}) // end-stream + compressed flags + } else { + _, _ = respWriter.Write([]byte{128}) // just the end-stream flag + } + data := []byte(testCase.endStream) + if testCase.compressed { + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + _, _ = w.Write([]byte(testCase.endStream)) + _ = w.Close() + data = buf.Bytes() + } var size [4]byte - binary.BigEndian.PutUint32(size[:], uint32(len(testCase.endStream))) + binary.BigEndian.PutUint32(size[:], uint32(len(data))) _, _ = respWriter.Write(size[:]) - _, _ = respWriter.Write([]byte(testCase.endStream)) + _, _ = respWriter.Write(data) })) t.Cleanup(svr.Close) client := conformancev1connect.NewConformanceServiceClient( diff --git a/internal/tracer/reader.go b/internal/tracer/reader.go index c7cae92b..0653ede9 100644 --- a/internal/tracer/reader.go +++ b/internal/tracer/reader.go @@ -25,8 +25,6 @@ import ( "sync" "sync/atomic" - "connectrpc.com/conformance/internal/compression" - conformancev1 "connectrpc.com/conformance/internal/gen/proto/go/connectrpc/conformance/v1" "connectrpc.com/connect" ) @@ -292,37 +290,12 @@ func propertiesFromHeaders(headers http.Header) (isStream bool, decomp connect.D } switch { case strings.HasPrefix(contentType, "application/connect"): - return true, getDecompressor(headers.Get("Connect-Content-Encoding")) + return true, GetDecompressor(headers.Get("Connect-Content-Encoding")) case strings.HasPrefix(contentType, "application/grpc"): - return true, getDecompressor(headers.Get("Grpc-Encoding")) + return true, GetDecompressor(headers.Get("Grpc-Encoding")) default: // We should only need a decompressor for streams (to decompress the end-stream message) // So for non-stream protocols, this no-op decompressor should suffice. return false, brokenDecompressor{} } } - -func getDecompressor(encoding string) connect.Decompressor { - var comp conformancev1.Compression - switch strings.ToLower(encoding) { - case "", "identity": - comp = conformancev1.Compression_COMPRESSION_IDENTITY - case "gzip": - comp = conformancev1.Compression_COMPRESSION_GZIP - case "br": - comp = conformancev1.Compression_COMPRESSION_BR - case "zstd": - comp = conformancev1.Compression_COMPRESSION_ZSTD - case "deflate": - comp = conformancev1.Compression_COMPRESSION_DEFLATE - case "snappy": - comp = conformancev1.Compression_COMPRESSION_SNAPPY - default: - return brokenDecompressor{} - } - decomp, err := compression.GetDecompressor(comp) - if err != nil { - return brokenDecompressor{} - } - return decomp -} diff --git a/internal/tracer/tracer.go b/internal/tracer/tracer.go index 3ad80b27..f737a12d 100644 --- a/internal/tracer/tracer.go +++ b/internal/tracer/tracer.go @@ -24,6 +24,9 @@ import ( "time" "connectrpc.com/conformance/internal" + "connectrpc.com/conformance/internal/compression" + conformancev1 "connectrpc.com/conformance/internal/gen/proto/go/connectrpc/conformance/v1" + "connectrpc.com/connect" ) const ( @@ -350,6 +353,32 @@ func (r *RequestCanceled) print(printer internal.Printer) { printer.Printf("%s %9.3fms canceled", requestPrefix, r.offsetMillis()) } +// GetDecompressor returns a decompressor that can handle the given encoding. +func GetDecompressor(encoding string) connect.Decompressor { + var comp conformancev1.Compression + switch strings.ToLower(encoding) { + case "", "identity": + comp = conformancev1.Compression_COMPRESSION_IDENTITY + case "gzip": + comp = conformancev1.Compression_COMPRESSION_GZIP + case "br": + comp = conformancev1.Compression_COMPRESSION_BR + case "zstd": + comp = conformancev1.Compression_COMPRESSION_ZSTD + case "deflate": + comp = conformancev1.Compression_COMPRESSION_DEFLATE + case "snappy": + comp = conformancev1.Compression_COMPRESSION_SNAPPY + default: + return brokenDecompressor{} + } + decomp, err := compression.GetDecompressor(comp) + if err != nil { + return brokenDecompressor{} + } + return decomp +} + type traceResult struct { trace Trace done chan struct{}