Skip to content

Commit

Permalink
Merge pull request #342 from yarpc/yabrelease0.20.0
Browse files Browse the repository at this point in the history
Yab Release v0.20.0
  • Loading branch information
Alexandre Wilhelm authored May 18, 2021
2 parents 8edf8ca + fa1da60 commit 5f8e469
Show file tree
Hide file tree
Showing 14 changed files with 558 additions and 78 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Changelog
=========

# 0.20.0 (2021-05-18)
* Add `stream-delay-close-send` option which delays client send stream closure.
* New: gRPC details are now printed along with the error if there are any.

# 0.19.1 (2021-04-02)
* Fix byte parsing to allow 8-bit signed integers to match the Thrift spec & other language implementations.

Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# yab [![Build Status][ci-img]][ci] [![Coverage Status][cov-img]][cov]

`yab` (Yet Another Benchmarker) is a tool to benchmark YARPC services. It
currently supports making Thrift requests to both HTTP and TChannel services.
currently supports making Thrift requests to both HTTP and [TChannel](https://github.com/uber/tchannel) services, as well as Protobuf requests to [gRPC](https://grpc.io/) services.

`yab` is currently in **beta** status.

Expand Down Expand Up @@ -73,7 +73,11 @@ Request Options:
--multiplexed-thrift Enables the Thrift TMultiplexedProtocol used
by services that host multiple Thrift services
on a single endpoint.
--stream-interval= Interval between consecutive stream message sends, applicable separately to every stream request opened on a connection.
--stream-interval= Interval between consecutive stream message sends,
applicable separately to every stream request
opened on a connection.
--stream-delay-close-send= Delay the closure of send stream once all the
request messages have been sent.
Transport Options:
-s, --service= The TChannel/Hyperbahn service name
Expand Down
7 changes: 7 additions & 0 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ type Serializer interface {
MethodType() MethodType
}

// ProtoErrorDeserializer deserializes errors.
// Error details are very specific to proto.
type ProtoErrorDeserializer interface {
// Error converts an error into something that can be displayed to a user.
ErrorDetails(err error) ([]interface{}, error)
}

// StreamRequestReader interface exposes method to read multiple request body
type StreamRequestReader interface {
// NextBody returns the encoded request body if available, and if not, returns an
Expand Down
45 changes: 45 additions & 0 deletions encoding/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"go.uber.org/yarpc/pkg/procedure"
"go.uber.org/yarpc/yarpcerrors"
"google.golang.org/genproto/googleapis/rpc/status"
)

type protoSerializer struct {
Expand Down Expand Up @@ -106,6 +108,49 @@ func (p protoSerializer) Encoding() Encoding {
return Protobuf
}

func (p protoSerializer) ErrorDetails(err error) ([]interface{}, error) {
// Here we use yarpcerrors since the transport layer of yab is using yarpc as well
if !yarpcerrors.IsStatus(err) {
return nil, nil
}

yerr := yarpcerrors.FromError(err)
if len(yerr.Details()) == 0 {
return nil, nil
}

errStatus := &status.Status{}
if err := proto.Unmarshal(yerr.Details(), errStatus); err != nil {
return nil, fmt.Errorf("could not unmarshal error details %s", err.Error())
}

details := []interface{}{}
for _, detail := range errStatus.Details {
// By default we set to the value of the proto detail message to its byte values.
// It is possible that YAB will not be able to resolve the type message.
// This can happen when an error is being bubbled up in a chain of services.
// For instance, let's say we have A -> B -> C.
// If A does not have registered detail messages descriptors from C and B blindly bubbled up
// errors from C, YAB will not be able to resolve the type of the details based
// on the descriptors given by A (through the reflection server).
var value interface{} = detail.Value

rdetail, rerr := p.anyResolver.Resolve(detail.TypeUrl)
if rerr == nil {
if err := proto.Unmarshal(detail.Value, rdetail); err != nil {
return nil, fmt.Errorf("could not unmarshal error detail message %s", err.Error())
}
value = rdetail
}

details = append(details, map[string]interface{}{
detail.TypeUrl: value,
})
}

return details, nil
}

func (p protoSerializer) MethodType() MethodType {
if p.method.IsClientStreaming() && p.method.IsServerStreaming() {
return BidirectionalStream
Expand Down
134 changes: 134 additions & 0 deletions encoding/protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/jhump/protoreflect/dynamic"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/yarpc/yarpcerrors"
"google.golang.org/genproto/googleapis/rpc/status"
)

func TestNewProtobuf(t *testing.T) {
Expand Down Expand Up @@ -159,6 +161,138 @@ func TestProtobufRequest(t *testing.T) {
}
}

func TestProtobufErrorDetails(t *testing.T) {
source, err := protobuf.NewDescriptorProviderFileDescriptorSetBins("../testdata/protobuf/simple/simple.proto.bin")
require.NoError(t, err)
anySource, err := protobuf.NewDescriptorProviderFileDescriptorSetBins("../testdata/protobuf/any/any.proto.bin")
require.NoError(t, err)

grpcStatusBytesFunc := func(typeUrl string, msg proto.Message) []byte {
valueContent, err := proto.Marshal(msg)
require.NoError(t, err)

a := &any.Any{
TypeUrl: typeUrl,
Value: valueContent,
}

s := &status.Status{}
s.Details = []*any.Any{a}
bytes, err := proto.Marshal(s)
require.NoError(t, err)
return bytes
}

grpcStatusCorrupedBytesFunc := func(typeUrl string, msg proto.Message) []byte {
valueContent, err := proto.Marshal(msg)
require.NoError(t, err)

a := &any.Any{
TypeUrl: typeUrl,
Value: valueContent[:1],
}

s := &status.Status{}
s.Details = []*any.Any{a}
bytes, err := proto.Marshal(s)
require.NoError(t, err)
return bytes
}

tests := []struct {
desc string
method string
source protobuf.DescriptorProvider
err error
wantOutAsJSON string
wantErr error
}{
{
desc: "nil error",
source: source,
method: "Bar/Baz",
err: nil,
wantErr: nil,
wantOutAsJSON: "null",
},
{
desc: "std error",
source: source,
method: "Bar/Baz",
err: fmt.Errorf("this is a test error"),
wantErr: nil,
wantOutAsJSON: "null",
},
{
desc: "yarpc error with no details",
source: source,
method: "Bar/Baz",
err: yarpcerrors.FromError(fmt.Errorf("this is a test error")),
wantErr: nil,
wantOutAsJSON: "null",
},
{
desc: "yarpc error bad details bytes",
source: source,
method: "Bar/Baz",
err: yarpcerrors.FromError(fmt.Errorf("this is a test error")).WithDetails([]byte{0x8, 0x1, 0x12}),
wantErr: fmt.Errorf("could not unmarshal error details unexpected EOF"),
wantOutAsJSON: "null",
},
{
desc: "yarpc error with 1 detail",
source: anySource,
method: "BarAny/BazAny",
err: yarpcerrors.FromError(fmt.Errorf("this is a test error")).WithDetails(grpcStatusBytesFunc("type.googleapis.com/FooAny", &tany.FooAny{
Value: 10,
})),
wantErr: nil,
wantOutAsJSON: `[{"type.googleapis.com/FooAny":{"value":10}}]`,
},
{
desc: "yarpc error with detail which can not be resolved",
source: anySource,
method: "BarAny/BazAny",
err: yarpcerrors.FromError(fmt.Errorf("this is a test error")).WithDetails(grpcStatusBytesFunc("uri/not/identifiable", &tany.FooAny{
Value: 10,
})),
wantErr: nil,
wantOutAsJSON: `[{"uri/not/identifiable":{"V":"CAo="}}]`,
},
{
desc: "yarpc error with detail with corrupted bytes in any detail message",
source: anySource,
method: "BarAny/BazAny",
err: yarpcerrors.FromError(fmt.Errorf("this is a test error")).WithDetails(grpcStatusCorrupedBytesFunc("type.googleapis.com/FooAny", &tany.FooAny{
Value: 10,
})),
wantErr: fmt.Errorf("could not unmarshal error detail message unexpected EOF"),
wantOutAsJSON: ``,
},
}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
serializer, err := NewProtobuf(tt.method, tt.source)
require.NoError(t, err, "Failed to create serializer")
errorSerializer, ok := serializer.(ProtoErrorDeserializer)
require.True(t, ok)

result, err := errorSerializer.ErrorDetails(tt.err)

if tt.wantErr != nil {
require.Error(t, err, tt.desc)
assert.Contains(t, err.Error(), tt.wantErr.Error(), "%v: invalid error", tt.desc)
} else {
require.NoError(t, err, tt.desc)
r, err := json.Marshal(result)
require.NoError(t, err)
assert.Equal(t, tt.wantOutAsJSON, string(r))
}
})
}
}

func TestProtobufResponse(t *testing.T) {
source, err := protobuf.NewDescriptorProviderFileDescriptorSetBins("../testdata/protobuf/simple/simple.proto.bin")
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
go.uber.org/zap v1.10.0
golang.org/x/net v0.0.0-20190926025831-c00fd9afed17
golang.org/x/text v0.3.1-0.20180511172408-5c1cf69b5978 // indirect
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect
google.golang.org/grpc v1.24.0
gopkg.in/yaml.v2 v2.2.2
)
Expand Down
16 changes: 12 additions & 4 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func makeClientStream(ctx context.Context, stream *yarpctransport.ClientStream,
}

if err == io.EOF {
err = closeSendStream(ctx, stream)
err = closeSendStream(ctx, stream, opts.DelayCloseSendStream.Duration())
}
if err != nil {
return err
Expand Down Expand Up @@ -241,7 +241,7 @@ func makeBidiStream(ctx context.Context, cancel context.CancelFunc, stream *yarp
var reqBody []byte
reqBody, err = streamIO.NextRequest()
if err == io.EOF {
err = closeSendStream(ctx, stream)
err = closeSendStream(ctx, stream, opts.DelayCloseSendStream.Duration())
break
}
if err != nil {
Expand Down Expand Up @@ -317,8 +317,16 @@ func receiveStreamMessage(ctx context.Context, stream *yarpctransport.ClientStre
}

// closeSendStream closes the stream from the client side while
// stream can continue to receive messages from server
func closeSendStream(ctx context.Context, stream *yarpctransport.ClientStream) error {
// stream can continue to receive messages from server. If non-zero delay is
// passed, stream is closed after the delay.
func closeSendStream(ctx context.Context, stream *yarpctransport.ClientStream, delayCloseSendStream time.Duration) error {
if delayCloseSendStream != 0 {
select {
case <-time.After(delayCloseSendStream):
case <-ctx.Done():
}
}

// YARPC stream.Close method internally invokes closeSend on gRPC clientStream.
if err := stream.Close(ctx); err != nil {
return fmt.Errorf("Failed to close send stream: %v", err)
Expand Down
68 changes: 68 additions & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/yarpc/yab/encoding"
"go.uber.org/yarpc/api/transport"
)

type mockStreamReader struct {
Expand Down Expand Up @@ -152,3 +153,70 @@ func TestIntervalWaiter(t *testing.T) {
}
})
}

type mockStreamCloser struct {
closeErr error
}

func (m mockStreamCloser) Close(context.Context) error { return m.closeErr }

func (mockStreamCloser) Context() context.Context { return context.Background() }

func (mockStreamCloser) Request() *transport.StreamRequest { return nil }

func (mockStreamCloser) SendMessage(context.Context, *transport.StreamMessage) error { return nil }

func (mockStreamCloser) ReceiveMessage(context.Context) (*transport.StreamMessage, error) {
return nil, nil
}

func TestCloseSendStream(t *testing.T) {
tests := []struct {
description string
delay time.Duration
err error
}{
{
description: "close without delay",
},
{
description: "close with error",
err: errors.New("test error"),
},
{
description: "close with delay",
delay: time.Millisecond * 50,
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
stream, err := transport.NewClientStream(mockStreamCloser{closeErr: test.err})
require.NoError(t, err, "unexpected client stream error")

start := time.Now()
err = closeSendStream(context.Background(), stream, test.delay)
end := time.Now()

if test.err != nil {
require.Error(t, err, "unexpected nil error from close")
assert.Contains(t, err.Error(), test.err.Error(), "unexpected error message from close")
} else {
assert.NoError(t, err, "unexpected error from close")
}

assert.True(t, end.After(start.Add(test.delay)), "unexpected execution time")
})
}

t.Run("must not delay when context is done", func(t *testing.T) {
stream, err := transport.NewClientStream(mockStreamCloser{})
require.NoError(t, err, "unexpected client stream error")

ctx, cancel := context.WithCancel(context.Background())
cancel()
start := time.Now()
require.NoError(t, closeSendStream(ctx, stream, time.Second), "unexpected error from close")
assert.True(t, time.Now().Before(start.Add(time.Second)), "expected close to execute immediately")
})
}
Loading

0 comments on commit 5f8e469

Please sign in to comment.