Skip to content

Commit

Permalink
fix compression and call options for other calls
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Mar 16, 2020
1 parent bd585a0 commit 26c0b04
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,11 @@ func (w *Worker) getMessages(ctd *callTemplateData, inputData []byte) ([]*dynami
func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input []*dynamic.Message) error {
var str *grpcdynamic.ClientStream
var err error
var callOptions = []grpc.CallOption{}
if w.config.enableCompression {
str, err = w.stub.InvokeRpcClientStream(*ctx, w.mtd, grpc.UseCompressor(gzip.Name))
} else {
str, err = w.stub.InvokeRpcClientStream(*ctx, w.mtd)
callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))
}
str, err = w.stub.InvokeRpcClientStream(*ctx, w.mtd, callOptions...)

if err != nil && w.config.hasLog {
w.config.log.Errorw("Invoke Client Streaming RPC call error: "+err.Error(), "workerID", w.workerID,
Expand Down Expand Up @@ -275,7 +275,11 @@ func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input []*dynam
}

func (w *Worker) makeServerStreamingRequest(ctx *context.Context, input *dynamic.Message) error {
str, err := w.stub.InvokeRpcServerStream(*ctx, w.mtd, input, grpc.UseCompressor(gzip.Name))
var callOptions = []grpc.CallOption{}
if w.config.enableCompression {
callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))
}
str, err := w.stub.InvokeRpcServerStream(*ctx, w.mtd, input, callOptions...)

if err != nil && w.config.hasLog {
w.config.log.Errorw("Invoke Server Streaming RPC call error: "+err.Error(), "workerID", w.workerID,
Expand Down Expand Up @@ -307,11 +311,11 @@ func (w *Worker) makeServerStreamingRequest(ctx *context.Context, input *dynamic
func (w *Worker) makeBidiRequest(ctx *context.Context, input []*dynamic.Message) error {
var str *grpcdynamic.BidiStream
var err error
var callOptions = []grpc.CallOption{}
if w.config.enableCompression {
str, err = w.stub.InvokeRpcBidiStream(*ctx, w.mtd, grpc.UseCompressor(gzip.Name))
} else {
str, err = w.stub.InvokeRpcBidiStream(*ctx, w.mtd)
callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))
}
str, err = w.stub.InvokeRpcBidiStream(*ctx, w.mtd, callOptions...)

if err != nil {
if w.config.hasLog {
Expand Down

0 comments on commit 26c0b04

Please sign in to comment.