diff --git a/runner/worker.go b/runner/worker.go index ad6f1dcb..d371ef30 100644 --- a/runner/worker.go +++ b/runner/worker.go @@ -200,11 +200,11 @@ func (w *Worker) getMessages(ctd *callTemplateData, inputData []byte) ([]*dynami func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input []*dynamic.Message) error { var str *grpcdynamic.ClientStream var err error + var callOptions = []grpc.CallOption{} if w.config.enableCompression { - str, err = w.stub.InvokeRpcClientStream(*ctx, w.mtd, grpc.UseCompressor(gzip.Name)) - } else { - str, err = w.stub.InvokeRpcClientStream(*ctx, w.mtd) + callOptions = append(callOptions, grpc.UseCompressor(gzip.Name)) } + str, err = w.stub.InvokeRpcClientStream(*ctx, w.mtd, callOptions...) if err != nil && w.config.hasLog { w.config.log.Errorw("Invoke Client Streaming RPC call error: "+err.Error(), "workerID", w.workerID, @@ -275,7 +275,11 @@ func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input []*dynam } func (w *Worker) makeServerStreamingRequest(ctx *context.Context, input *dynamic.Message) error { - str, err := w.stub.InvokeRpcServerStream(*ctx, w.mtd, input, grpc.UseCompressor(gzip.Name)) + var callOptions = []grpc.CallOption{} + if w.config.enableCompression { + callOptions = append(callOptions, grpc.UseCompressor(gzip.Name)) + } + str, err := w.stub.InvokeRpcServerStream(*ctx, w.mtd, input, callOptions...) if err != nil && w.config.hasLog { w.config.log.Errorw("Invoke Server Streaming RPC call error: "+err.Error(), "workerID", w.workerID, @@ -307,11 +311,11 @@ func (w *Worker) makeServerStreamingRequest(ctx *context.Context, input *dynamic func (w *Worker) makeBidiRequest(ctx *context.Context, input []*dynamic.Message) error { var str *grpcdynamic.BidiStream var err error + var callOptions = []grpc.CallOption{} if w.config.enableCompression { - str, err = w.stub.InvokeRpcBidiStream(*ctx, w.mtd, grpc.UseCompressor(gzip.Name)) - } else { - str, err = w.stub.InvokeRpcBidiStream(*ctx, w.mtd) + callOptions = append(callOptions, grpc.UseCompressor(gzip.Name)) } + str, err = w.stub.InvokeRpcBidiStream(*ctx, w.mtd, callOptions...) if err != nil { if w.config.hasLog {