Skip to content

Commit

Permalink
add gzip support (#164)
Browse files Browse the repository at this point in the history
* add gzip support

* add enable-compression flag and grpc-accept-encoding metadata to requests in case it is set

* use var arg for unary requests and complete doc
  • Loading branch information
pgehin-leansys authored Mar 16, 2020
1 parent ab154b0 commit bd585a0
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 46 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Flags:
--cpus=12 Number of cpu cores to use.
--debug= The path to debug log file.
-v, --version Show application version.
-e, --enable-compression Enable Gzip compression on requests.
Args:
[<host>] Host and port to test.
Expand Down
73 changes: 37 additions & 36 deletions cmd/ghz/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,42 +32,43 @@ func (d Duration) String() string {

// config for the run.
type config struct {
Proto string `json:"proto" toml:"proto" yaml:"proto"`
Protoset string `json:"protoset" toml:"protoset" yaml:"protoset"`
Call string `json:"call" toml:"call" yaml:"call" required:"true"`
RootCert string `json:"cacert" toml:"cacert" yaml:"cacert"`
Cert string `json:"cert" toml:"cert" yaml:"cert"`
Key string `json:"key" toml:"key" yaml:"key"`
SkipTLSVerify bool `json:"skipTLS" toml:"skipTLS" yaml:"skipTLS"`
CName string `json:"cname" toml:"cname" yaml:"cname"`
Authority string `json:"authority" toml:"authority" yaml:"authority"`
Insecure bool `json:"insecure,omitempty" toml:"insecure,omitempty" yaml:"insecure,omitempty"`
N uint `json:"total" toml:"total" yaml:"total" default:"200"`
C uint `json:"concurrency" toml:"concurrency" yaml:"concurrency" default:"50"`
Connections uint `json:"connections" toml:"connections" yaml:"connections" default:"1"`
QPS uint `json:"qps" toml:"qps" yaml:"qps"`
Z Duration `json:"duration" toml:"duration" yaml:"duration"`
ZStop string `json:"duration-stop" toml:"duration-stop" yaml:"duration-stop" default:"close"`
X Duration `json:"max-duration" toml:"max-duration" yaml:"max-duration"`
Timeout Duration `json:"timeout" toml:"timeout" yaml:"timeout" default:"20s"`
Data interface{} `json:"data,omitempty" toml:"data,omitempty" yaml:"data,omitempty"`
DataPath string `json:"data-file" toml:"data-file" yaml:"data-file"`
BinData []byte `json:"-" toml:"-" yaml:"-"`
BinDataPath string `json:"binary-file" toml:"binary-file" yaml:"binary-file"`
Metadata *map[string]string `json:"metadata,omitempty" toml:"metadata,omitempty" yaml:"metadata,omitempty"`
MetadataPath string `json:"metadata-file" toml:"metadata-file" yaml:"metadata-file"`
SI Duration `json:"stream-interval" toml:"stream-interval" yaml:"stream-interval"`
Output string `json:"output" toml:"output" yaml:"output"`
Format string `json:"format" toml:"format" yaml:"format" default:"summary"`
DialTimeout Duration `json:"connect-timeout" toml:"connect-timeout" yaml:"connect-timeout" default:"10s"`
KeepaliveTime Duration `json:"keepalive" toml:"keepalive" yaml:"keepalive"`
CPUs uint `json:"cpus" toml:"cpus" yaml:"cpus"`
ImportPaths []string `json:"import-paths,omitempty" toml:"import-paths,omitempty" yaml:"import-paths,omitempty"`
Name string `json:"name,omitempty" toml:"name,omitempty" yaml:"name,omitempty"`
Tags *map[string]string `json:"tags,omitempty" toml:"tags,omitempty" yaml:"tags,omitempty"`
ReflectMetadata *map[string]string `json:"reflect-metadata,omitempty" toml:"reflect-metadata,omitempty" yaml:"reflect-metadata,omitempty"`
Debug string `json:"debug,omitempty" toml:"debug,omitempty" yaml:"debug,omitempty"`
Host string `json:"host" toml:"host" yaml:"host"`
Proto string `json:"proto" toml:"proto" yaml:"proto"`
Protoset string `json:"protoset" toml:"protoset" yaml:"protoset"`
Call string `json:"call" toml:"call" yaml:"call" required:"true"`
RootCert string `json:"cacert" toml:"cacert" yaml:"cacert"`
Cert string `json:"cert" toml:"cert" yaml:"cert"`
Key string `json:"key" toml:"key" yaml:"key"`
SkipTLSVerify bool `json:"skipTLS" toml:"skipTLS" yaml:"skipTLS"`
CName string `json:"cname" toml:"cname" yaml:"cname"`
Authority string `json:"authority" toml:"authority" yaml:"authority"`
Insecure bool `json:"insecure,omitempty" toml:"insecure,omitempty" yaml:"insecure,omitempty"`
N uint `json:"total" toml:"total" yaml:"total" default:"200"`
C uint `json:"concurrency" toml:"concurrency" yaml:"concurrency" default:"50"`
Connections uint `json:"connections" toml:"connections" yaml:"connections" default:"1"`
QPS uint `json:"qps" toml:"qps" yaml:"qps"`
Z Duration `json:"duration" toml:"duration" yaml:"duration"`
ZStop string `json:"duration-stop" toml:"duration-stop" yaml:"duration-stop" default:"close"`
X Duration `json:"max-duration" toml:"max-duration" yaml:"max-duration"`
Timeout Duration `json:"timeout" toml:"timeout" yaml:"timeout" default:"20s"`
Data interface{} `json:"data,omitempty" toml:"data,omitempty" yaml:"data,omitempty"`
DataPath string `json:"data-file" toml:"data-file" yaml:"data-file"`
BinData []byte `json:"-" toml:"-" yaml:"-"`
BinDataPath string `json:"binary-file" toml:"binary-file" yaml:"binary-file"`
Metadata *map[string]string `json:"metadata,omitempty" toml:"metadata,omitempty" yaml:"metadata,omitempty"`
MetadataPath string `json:"metadata-file" toml:"metadata-file" yaml:"metadata-file"`
SI Duration `json:"stream-interval" toml:"stream-interval" yaml:"stream-interval"`
Output string `json:"output" toml:"output" yaml:"output"`
Format string `json:"format" toml:"format" yaml:"format" default:"summary"`
DialTimeout Duration `json:"connect-timeout" toml:"connect-timeout" yaml:"connect-timeout" default:"10s"`
KeepaliveTime Duration `json:"keepalive" toml:"keepalive" yaml:"keepalive"`
CPUs uint `json:"cpus" toml:"cpus" yaml:"cpus"`
ImportPaths []string `json:"import-paths,omitempty" toml:"import-paths,omitempty" yaml:"import-paths,omitempty"`
Name string `json:"name,omitempty" toml:"name,omitempty" yaml:"name,omitempty"`
Tags *map[string]string `json:"tags,omitempty" toml:"tags,omitempty" yaml:"tags,omitempty"`
ReflectMetadata *map[string]string `json:"reflect-metadata,omitempty" toml:"reflect-metadata,omitempty" yaml:"reflect-metadata,omitempty"`
Debug string `json:"debug,omitempty" toml:"debug,omitempty" yaml:"debug,omitempty"`
Host string `json:"host" toml:"host" yaml:"host"`
EnableCompression bool `json:"enable-compression,omitempty" toml:"enable-compression,omitempty" yaml:"enable-compression,omitempty"`
}

// UnmarshalJSON is our custom implementation to handle the Duration fields
Expand Down
6 changes: 6 additions & 0 deletions cmd/ghz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ var (

isHostSet = false
host = kingpin.Arg("host", "Host and port to test.").String()

isEnableCompressionSet = false
enableCompression = kingpin.Flag("enable-compression", "Enable Gzip compression on requests.").
Short('e').Default("false").IsSetByUser(&isEnableCompressionSet).Bool()
)

func main() {
Expand Down Expand Up @@ -245,6 +249,7 @@ func main() {
runner.WithStreamInterval(time.Duration(cfg.SI)),
runner.WithReflectionMetadata(cfg.ReflectMetadata),
runner.WithConnections(cfg.Connections),
runner.WithEnableCompression(cfg.EnableCompression),
)

if strings.TrimSpace(cfg.MetadataPath) != "" {
Expand Down Expand Up @@ -432,6 +437,7 @@ func createConfigFromArgs(cfg *config) error {
cfg.Tags = &tagsMap
cfg.ReflectMetadata = &rmdMap
cfg.Debug = *debug
cfg.EnableCompression = *enableCompression

return nil
}
Expand Down
21 changes: 16 additions & 5 deletions runner/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
// RunConfig represents the request Configs
type RunConfig struct {
// call settings
call string
host string
proto string
importPaths []string
protoset string
call string
host string
proto string
importPaths []string
protoset string
enableCompression bool

// security settings
creds credentials.TransportCredentials
Expand Down Expand Up @@ -537,6 +538,16 @@ func newConfig(call, host string, options ...Option) (*RunConfig, error) {
return c, nil
}

// WithEnableCompression specifies that requests should be done using gzip Compressor
// WithEnableCompression(true)
func WithEnableCompression(enableCompression bool) Option {
return func(o *RunConfig) error {
o.enableCompression = enableCompression

return nil
}
}

func createClientTransportCredentials(skipVerify bool, cacertFile, clientCertFile, clientKeyFile, cname string) (credentials.TransportCredentials, error) {
var tlsConf tls.Config

Expand Down
8 changes: 8 additions & 0 deletions runner/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestRunConfig_newRunConfig(t *testing.T) {
assert.Equal(t, "testdata/data.proto", string(c.proto))
assert.Equal(t, "", string(c.protoset))
assert.Equal(t, []string{"testdata", "."}, c.importPaths)
assert.Equal(t, c.enableCompression, false)
})

t.Run("with options", func(t *testing.T) {
Expand Down Expand Up @@ -100,6 +101,7 @@ func TestRunConfig_newRunConfig(t *testing.T) {
assert.Equal(t, "testdata/data.proto", string(c.proto))
assert.Equal(t, "", string(c.protoset))
assert.Equal(t, []string{"testdata", ".", "/home/protos"}, c.importPaths)
assert.Equal(t, c.enableCompression, false)
})

t.Run("with binary data, protoset and metadata file", func(t *testing.T) {
Expand Down Expand Up @@ -146,6 +148,7 @@ func TestRunConfig_newRunConfig(t *testing.T) {
assert.Equal(t, "", string(c.proto))
assert.Equal(t, "testdata/bundle.protoset", string(c.protoset))
assert.NotNil(t, c.creds)
assert.Equal(t, c.enableCompression, false)
})

t.Run("with data interface and metadata map", func(t *testing.T) {
Expand Down Expand Up @@ -216,6 +219,7 @@ func TestRunConfig_newRunConfig(t *testing.T) {
assert.Equal(t, []string{"testdata", "."}, c.importPaths)
assert.NotNil(t, c.creds)
assert.Equal(t, map[string]string{"auth": "bizbaz"}, *c.rmd)
assert.Equal(t, c.enableCompression, false)
})

t.Run("with binary data from file", func(t *testing.T) {
Expand Down Expand Up @@ -244,6 +248,7 @@ func TestRunConfig_newRunConfig(t *testing.T) {
assert.Equal(t, "testdata/data.proto", string(c.proto))
assert.Equal(t, "", string(c.protoset))
assert.Equal(t, []string{"testdata", "."}, c.importPaths)
assert.Equal(t, c.enableCompression, false)
})

t.Run("with data from file", func(t *testing.T) {
Expand Down Expand Up @@ -273,6 +278,7 @@ func TestRunConfig_newRunConfig(t *testing.T) {
assert.Equal(t, "testdata/data.proto", string(c.proto))
assert.Equal(t, "", string(c.protoset))
assert.Equal(t, []string{"testdata", "."}, c.importPaths)
assert.Equal(t, c.enableCompression, false)
})

t.Run("with data from reader", func(t *testing.T) {
Expand Down Expand Up @@ -307,6 +313,7 @@ func TestRunConfig_newRunConfig(t *testing.T) {
assert.Equal(t, "testdata/data.proto", string(c.proto))
assert.Equal(t, "", string(c.protoset))
assert.Equal(t, []string{"testdata", "."}, c.importPaths)
assert.Equal(t, c.enableCompression, false)
})

t.Run("with connections", func(t *testing.T) {
Expand Down Expand Up @@ -342,6 +349,7 @@ func TestRunConfig_newRunConfig(t *testing.T) {
assert.Equal(t, "testdata/data.proto", string(c.proto))
assert.Equal(t, "", string(c.protoset))
assert.Equal(t, []string{"testdata", "."}, c.importPaths)
assert.Equal(t, c.enableCompression, false)
})

t.Run("with invalid connections > concurrency", func(t *testing.T) {
Expand Down
38 changes: 33 additions & 5 deletions runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/jhump/protoreflect/dynamic/grpcdynamic"
"go.uber.org/multierr"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
)

Expand Down Expand Up @@ -88,6 +91,12 @@ func (w *Worker) makeRequest() error {
if mdMap != nil && len(*mdMap) > 0 {
md := metadata.New(*mdMap)
reqMD = &md
} else {
reqMD = &metadata.MD{}
}

if w.config.enableCompression {
reqMD.Append("grpc-accept-encoding", gzip.Name)
}

ctx := context.Background()
Expand Down Expand Up @@ -122,7 +131,6 @@ func (w *Worker) makeRequest() error {
}

// RPC errors are handled via stats handler

if w.mtd.IsClientStreaming() && w.mtd.IsServerStreaming() {
_ = w.makeBidiRequest(&ctx, inputs)
} else if w.mtd.IsClientStreaming() {
Expand All @@ -138,7 +146,14 @@ func (w *Worker) makeRequest() error {
if w.mtd.IsServerStreaming() {
_ = w.makeServerStreamingRequest(&ctx, inputs[inputIdx])
} else {
res, resErr := w.stub.InvokeRpc(ctx, w.mtd, inputs[inputIdx])
var res proto.Message
var resErr error
var callOptions = []grpc.CallOption{}
if w.config.enableCompression {
callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))
}

res, resErr = w.stub.InvokeRpc(ctx, w.mtd, inputs[inputIdx], callOptions...)

if w.config.hasLog {
w.config.log.Debugw("Received response", "workerID", w.workerID, "call type", callType,
Expand Down Expand Up @@ -183,7 +198,13 @@ func (w *Worker) getMessages(ctd *callTemplateData, inputData []byte) ([]*dynami
}

func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input []*dynamic.Message) error {
str, err := w.stub.InvokeRpcClientStream(*ctx, w.mtd)
var str *grpcdynamic.ClientStream
var err error
if w.config.enableCompression {
str, err = w.stub.InvokeRpcClientStream(*ctx, w.mtd, grpc.UseCompressor(gzip.Name))
} else {
str, err = w.stub.InvokeRpcClientStream(*ctx, w.mtd)
}

if err != nil && w.config.hasLog {
w.config.log.Errorw("Invoke Client Streaming RPC call error: "+err.Error(), "workerID", w.workerID,
Expand Down Expand Up @@ -254,7 +275,7 @@ func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input []*dynam
}

func (w *Worker) makeServerStreamingRequest(ctx *context.Context, input *dynamic.Message) error {
str, err := w.stub.InvokeRpcServerStream(*ctx, w.mtd, input)
str, err := w.stub.InvokeRpcServerStream(*ctx, w.mtd, input, grpc.UseCompressor(gzip.Name))

if err != nil && w.config.hasLog {
w.config.log.Errorw("Invoke Server Streaming RPC call error: "+err.Error(), "workerID", w.workerID,
Expand Down Expand Up @@ -284,7 +305,14 @@ func (w *Worker) makeServerStreamingRequest(ctx *context.Context, input *dynamic
}

func (w *Worker) makeBidiRequest(ctx *context.Context, input []*dynamic.Message) error {
str, err := w.stub.InvokeRpcBidiStream(*ctx, w.mtd)
var str *grpcdynamic.BidiStream
var err error
if w.config.enableCompression {
str, err = w.stub.InvokeRpcBidiStream(*ctx, w.mtd, grpc.UseCompressor(gzip.Name))
} else {
str, err = w.stub.InvokeRpcBidiStream(*ctx, w.mtd)
}

if err != nil {
if w.config.hasLog {
w.config.log.Errorw("Invoke Bidi RPC call error: "+err.Error(),
Expand Down
4 changes: 4 additions & 0 deletions www/docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,7 @@ Print the version.
### `-h`, `--help`

Show context-sensitive help (also try --help-long and --help-man).

### `-e`, `--enable-compression`

Enable gzip compression on requests.
1 change: 1 addition & 0 deletions www/docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Flags:
-d, --data= The call data as stringified JSON. If the value is '@' then the request contents are read from stdin.
-D, --data-file= File path for call data JSON file. Examples: /home/user/file.json or ./file.json.
-b, --binary The call data comes as serialized binary message or multiple count-prefixed messages read from stdin.
-e, --enable-compression Enable gzip compression on requests.
-B, --binary-file= File path for the call data as serialized binary message or multiple count-prefixed messages.
-m, --metadata= Request metadata as stringified JSON.
-M, --metadata-file= File path for call metadata JSON file. Examples: /home/user/metadata.json or ./metadata.json.
Expand Down

0 comments on commit bd585a0

Please sign in to comment.