Skip to content

Commit

Permalink
referenceclient: inspecting wire details needs to support compressed …
Browse files Browse the repository at this point in the history
…error bodies and we should not bother comparing debug in error JSON if value could not be decoded
  • Loading branch information
jhump committed Feb 27, 2024
1 parent af8590e commit ae162fb
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
name: Connect Compressed Error and End-Stream
mode: TEST_MODE_CLIENT
relevantProtocols:
- PROTOCOL_CONNECT
relevantCompressions:
# Ideally, we'd run this sort of test for any/all compression encodings
# supported by the client. But, since it uses a raw HTTP response, we
# have to hard-code the compression (raw response config doesn't allow
# parameterizing the encoding). So we test with the most likely encoding
# to be implemented.
- COMPRESSION_GZIP
relevantCodecs:
- CODEC_PROTO
testCases:
- request:
testName: error/compressed
service: connectrpc.conformance.v1.ConformanceService
method: Unary
streamType: STREAM_TYPE_UNARY
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.UnaryRequest
responseDefinition:
rawResponse:
statusCode: 422
headers:
- name: content-type
value: [ "application/json" ]
- name: content-encoding
value: [ "gzip" ]
unary:
text: |
{
"code": "out_of_range",
"message": "oops",
"details": [
{
"type": "google.protobuf.FileDescriptorProto",
"value": "Cgp0ZXN0LnByb3Rv",
"debug": { "name": "test.proto" }
}
]
}
compression: COMPRESSION_GZIP
expectedResponse:
error:
code: CODE_OUT_OF_RANGE
message: oops
details:
- "@type": type.googleapis.com/google.protobuf.FileDescriptorProto
name: "test.proto"

- request:
testName: end-stream/compressed
service: connectrpc.conformance.v1.ConformanceService
method: ServerStream
streamType: STREAM_TYPE_SERVER_STREAM
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.ServerStreamRequest
responseDefinition:
rawResponse:
statusCode: 200
headers:
- name: content-type
value: [ "application/connect+proto" ]
- name: connect-content-encoding
value: [ "gzip" ]
stream:
items:
- flags: 3
payload:
text: |
{
"error": {
"code": "out_of_range",
"message": "oops",
"foobar": "baz",
"details": [
{
"type": "google.protobuf.FileDescriptorProto",
"value": "Cgp0ZXN0LnByb3Rv",
"debug": { "name": "test.proto" }
}
]
}
}
compression: COMPRESSION_GZIP
expectedResponse:
error:
code: CODE_OUT_OF_RANGE
message: oops
details:
- "@type": type.googleapis.com/google.protobuf.FileDescriptorProto
name: "test.proto"
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: gRPC-Web Compressed Trailers
mode: TEST_MODE_CLIENT
relevantProtocols:
- PROTOCOL_GRPC_WEB
relevantCompressions:
# Ideally, we'd run this sort of test for any/all compression encodings
# supported by the client. But, since it uses a raw HTTP response, we
# have to hard-code the compression (raw response config doesn't allow
# parameterizing the encoding). So we test with the most likely encoding
# to be implemented.
- COMPRESSION_GZIP
relevantCodecs:
- CODEC_PROTO
testCases:
- request:
testName: trailers-in-body/compressed
streamType: STREAM_TYPE_UNARY
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.UnaryRequest
responseDefinition:
rawResponse:
status_code: 200
headers:
- name: "access-control-allow-origin"
value: [ "*" ]
- name: "access-control-expose-headers"
value: [ "*" ]
- name: content-type
value: [ "application/grpc-web" ]
- name: grpc-encoding
value: [ "gzip" ]
- name: x-custom-header
value: [ "foo" ]
stream:
items:
- flags: 129
payload:
text: "grpc-status: 9\r\ngrpc-message: error\r\nx-custom-trailer: bing\r\n"
compression: COMPRESSION_GZIP
expectedResponse:
responseHeaders:
- name: x-custom-header
value: [ "foo" ]
error:
code: 9
message: error
responseTrailers:
- name: x-custom-trailer
value: [ "bing" ]
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ relevantProtocols:
- PROTOCOL_GRPC_WEB
relevantCodecs:
- CODEC_PROTO
relevantCompressions:
- COMPRESSION_IDENTITY
# These tests verify that a gRPC-Web client can handle trailers in the body with
# no response, trailers-only responses (trailers in headers), and trailers with
# different cases (in addition to the "standard" all lower-case).
Expand Down
8 changes: 7 additions & 1 deletion internal/app/referenceclient/wire_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ func examineWireDetails(ctx context.Context, printer internal.Printer) (statusCo
case isUnaryJSONError(contentType, trace.Response.StatusCode):
// If this is a unary request that returned an error, then use the entire
// response body as the wire error details.
examineConnectError(wrapper.buf.Bytes(), printer)
decomp := tracer.GetDecompressor(trace.Response.Header.Get("content-encoding"))
if err := decomp.Reset(wrapper.buf); err == nil {
if body, err := io.ReadAll(decomp); err == nil {
examineConnectError(body, printer)
}
}
case strings.HasPrefix(contentType, "application/connect+"):
// If this is a streaming Connect request, then look through the trace events
// for the ResponseBodyEndStream event and parse its content into an
Expand Down Expand Up @@ -314,6 +319,7 @@ func examineConnectErrorDetail(i int, detailJSON json.RawMessage, printer intern
decoded, err := base64.RawStdEncoding.DecodeString(str)
if err != nil {
printer.Printf(`%s: value for key "value", %q, is not valid unpadded base64-encoding: %v`, prefix, val, err)
detail.Value = nil // this will skip the comparison of "value" and "debug" info below
break
}
decodedVal = decoded
Expand Down
82 changes: 75 additions & 7 deletions internal/app/referenceclient/wire_details_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package referenceclient

import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"encoding/json"
Expand All @@ -36,6 +38,7 @@ func TestExamineConnectError(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
compressed bool
endStream string
expectedFeedback []string
}{
Expand Down Expand Up @@ -67,6 +70,15 @@ func TestExamineConnectError(t *testing.T) {
]
}`,
},
{
name: "compressed",
compressed: true,
endStream: `
{
"code": "internal",
"message": "blah blah blah"
}`,
},
{
name: "complete",
endStream: `
Expand Down Expand Up @@ -287,8 +299,17 @@ func TestExamineConnectError(t *testing.T) {
t.Parallel()
svr := httptest.NewServer(http.HandlerFunc(func(respWriter http.ResponseWriter, req *http.Request) {
respWriter.Header().Set("Content-Type", "application/json")
if testCase.compressed {
respWriter.Header().Set("Content-Encoding", "gzip")
}
respWriter.WriteHeader(http.StatusBadRequest)
_, _ = respWriter.Write([]byte(testCase.endStream))
if testCase.compressed {
w := gzip.NewWriter(respWriter)
_, _ = w.Write([]byte(testCase.endStream))
_ = w.Close()
} else {
_, _ = respWriter.Write([]byte(testCase.endStream))
}
}))
t.Cleanup(svr.Close)
client := conformancev1connect.NewConformanceServiceClient(
Expand Down Expand Up @@ -326,6 +347,7 @@ func TestExamineConnectEndStream(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
compressed bool
endStream string
expectedFeedback []string
}{
Expand All @@ -344,6 +366,17 @@ func TestExamineConnectEndStream(t *testing.T) {
endStream: `
{"error":{"code":"canceled"}}`,
},
{
name: "compressed",
compressed: true,
endStream: `
{
"error": {"code": "invalid_argument", "message": "foobar"},
"metadata":{
"foo": ["bar", "baz"]
}
}`,
},
{
name: "nulls",
endStream: `
Expand Down Expand Up @@ -426,11 +459,25 @@ func TestExamineConnectEndStream(t *testing.T) {
t.Parallel()
svr := httptest.NewServer(http.HandlerFunc(func(respWriter http.ResponseWriter, req *http.Request) {
respWriter.Header().Set("Content-Type", "application/connect+proto")
_, _ = respWriter.Write([]byte{2}) // just the end-stream flag
if testCase.compressed {
respWriter.Header().Set("Connect-Content-Encoding", "gzip")
_, _ = respWriter.Write([]byte{3}) // end-stream + compressed flags
} else {
_, _ = respWriter.Write([]byte{2}) // just the end-stream flag
}

data := []byte(testCase.endStream)
if testCase.compressed {
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, _ = w.Write([]byte(testCase.endStream))
_ = w.Close()
data = buf.Bytes()
}
var size [4]byte
binary.BigEndian.PutUint32(size[:], uint32(len(testCase.endStream)))
binary.BigEndian.PutUint32(size[:], uint32(len(data)))
_, _ = respWriter.Write(size[:])
_, _ = respWriter.Write([]byte(testCase.endStream))
_, _ = respWriter.Write(data)
}))
t.Cleanup(svr.Close)
client := conformancev1connect.NewConformanceServiceClient(
Expand Down Expand Up @@ -468,6 +515,7 @@ func TestExamineGRPCEndStream(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
compressed bool
endStream string
expectedFeedback []string
}{
Expand All @@ -477,6 +525,13 @@ func TestExamineGRPCEndStream(t *testing.T) {
"grpc-message: foo\r\n" +
"blah-blah: foobar\r\n",
},
{
name: "compressed",
compressed: true,
endStream: "grpc-status: 6\r\n" +
"grpc-message: foo\r\n" +
"blah-blah: foobar\r\n",
},
{
name: "allowed special chars in key",
endStream: "grpc-status: 6\r\n" +
Expand Down Expand Up @@ -584,11 +639,24 @@ func TestExamineGRPCEndStream(t *testing.T) {
t.Parallel()
svr := httptest.NewServer(http.HandlerFunc(func(respWriter http.ResponseWriter, req *http.Request) {
respWriter.Header().Set("Content-Type", "application/grpc-web")
_, _ = respWriter.Write([]byte{128}) // just the end-stream flag
if testCase.compressed {
respWriter.Header().Set("Grpc-Encoding", "gzip")
_, _ = respWriter.Write([]byte{129}) // end-stream + compressed flags
} else {
_, _ = respWriter.Write([]byte{128}) // just the end-stream flag
}
data := []byte(testCase.endStream)
if testCase.compressed {
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, _ = w.Write([]byte(testCase.endStream))
_ = w.Close()
data = buf.Bytes()
}
var size [4]byte
binary.BigEndian.PutUint32(size[:], uint32(len(testCase.endStream)))
binary.BigEndian.PutUint32(size[:], uint32(len(data)))
_, _ = respWriter.Write(size[:])
_, _ = respWriter.Write([]byte(testCase.endStream))
_, _ = respWriter.Write(data)
}))
t.Cleanup(svr.Close)
client := conformancev1connect.NewConformanceServiceClient(
Expand Down
31 changes: 2 additions & 29 deletions internal/tracer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"sync"
"sync/atomic"

"connectrpc.com/conformance/internal/compression"
conformancev1 "connectrpc.com/conformance/internal/gen/proto/go/connectrpc/conformance/v1"
"connectrpc.com/connect"
)

Expand Down Expand Up @@ -292,37 +290,12 @@ func propertiesFromHeaders(headers http.Header) (isStream bool, decomp connect.D
}
switch {
case strings.HasPrefix(contentType, "application/connect"):
return true, getDecompressor(headers.Get("Connect-Content-Encoding"))
return true, GetDecompressor(headers.Get("Connect-Content-Encoding"))
case strings.HasPrefix(contentType, "application/grpc"):
return true, getDecompressor(headers.Get("Grpc-Encoding"))
return true, GetDecompressor(headers.Get("Grpc-Encoding"))
default:
// We should only need a decompressor for streams (to decompress the end-stream message)
// So for non-stream protocols, this no-op decompressor should suffice.
return false, brokenDecompressor{}
}
}

func getDecompressor(encoding string) connect.Decompressor {
var comp conformancev1.Compression
switch strings.ToLower(encoding) {
case "", "identity":
comp = conformancev1.Compression_COMPRESSION_IDENTITY
case "gzip":
comp = conformancev1.Compression_COMPRESSION_GZIP
case "br":
comp = conformancev1.Compression_COMPRESSION_BR
case "zstd":
comp = conformancev1.Compression_COMPRESSION_ZSTD
case "deflate":
comp = conformancev1.Compression_COMPRESSION_DEFLATE
case "snappy":
comp = conformancev1.Compression_COMPRESSION_SNAPPY
default:
return brokenDecompressor{}
}
decomp, err := compression.GetDecompressor(comp)
if err != nil {
return brokenDecompressor{}
}
return decomp
}
Loading

0 comments on commit ae162fb

Please sign in to comment.