Skip to content

Commit

Permalink
createPayloads doesn't care about streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
ezsilmar committed Apr 18, 2019
1 parent 107b29c commit 83c25fe
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 78 deletions.
52 changes: 18 additions & 34 deletions runner/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,71 +28,55 @@ func messageFromMap(input *dynamic.Message, data *map[string]interface{}) error
return nil
}

func createPayloads(data string, mtd *desc.MethodDescriptor) (*dynamic.Message, *[]*dynamic.Message, error) {
func createPayloads(data string, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
md := mtd.GetInputType()
var input *dynamic.Message
var streamInput []*dynamic.Message
var inputs []*dynamic.Message

if len(data) > 0 {
if strings.IndexRune(data, '[') == 0 {
dataArray := make([]map[string]interface{}, 5)
err := json.Unmarshal([]byte(data), &dataArray)
if err != nil {
return nil, nil, fmt.Errorf("Error unmarshalling payload. Data: '%v' Error: %v", data, err.Error())
return nil, fmt.Errorf("Error unmarshalling payload. Data: '%v' Error: %v", data, err.Error())
}

elems := len(dataArray)
if elems > 0 {
streamInput = make([]*dynamic.Message, elems)
inputs = make([]*dynamic.Message, elems)
}

for i, elem := range dataArray {
elemMsg := dynamic.NewMessage(md)
err := messageFromMap(elemMsg, &elem)
if err != nil {
return nil, nil, fmt.Errorf("Error creating message: %v", err.Error())
return nil, fmt.Errorf("Error creating message: %v", err.Error())
}

streamInput[i] = elemMsg
inputs[i] = elemMsg
}
} else {
input = dynamic.NewMessage(md)
err := jsonpb.UnmarshalString(data, input)
inputs = make([]*dynamic.Message, 1)
inputs[0] = dynamic.NewMessage(md)
err := jsonpb.UnmarshalString(data, inputs[0])
if err != nil {
return nil, nil, fmt.Errorf("Error creating message from data. Data: '%v' Error: %v", data, err.Error())
return nil, fmt.Errorf("Error creating message from data. Data: '%v' Error: %v", data, err.Error())
}
}
}

if mtd.IsClientStreaming() && len(streamInput) == 0 && input != nil {
streamInput = make([]*dynamic.Message, 1)
streamInput[0] = input
input = nil
}

if !mtd.IsClientStreaming() && input == nil && len(streamInput) > 0 {
input = streamInput[0]
streamInput = nil
}

return input, &streamInput, nil
return &inputs, nil
}

func createPayloadsFromBin(binData []byte, mtd *desc.MethodDescriptor) (*dynamic.Message, *[]*dynamic.Message, error) {
func createPayloadsFromBin(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
md := mtd.GetInputType()
input := dynamic.NewMessage(md)
streamInput := make([]*dynamic.Message, 1)

err := proto.Unmarshal(binData, input)
if err != nil {
return nil, nil, fmt.Errorf("Error creating message from binary data: %v", err.Error())
}
inputs := make([]*dynamic.Message, 1)
inputs[0] = dynamic.NewMessage(md)

if mtd.IsClientStreaming() && input != nil {
streamInput = make([]*dynamic.Message, 1)
streamInput[0] = input
input = nil
err := proto.Unmarshal(binData, inputs[0])
if err != nil {
return nil, fmt.Errorf("Error creating message from binary data: %v", err.Error())
}

return input, &streamInput, nil
return &inputs, nil
}
73 changes: 38 additions & 35 deletions runner/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ func TestData_createPayloads(t *testing.T) {
assert.NotNil(t, mtdTestUnaryTwo)

t.Run("get nil, empty when empty", func(t *testing.T) {
single, streaming, err := createPayloads("", mtdUnary)
inputs, err := createPayloads("", mtdUnary)
assert.NoError(t, err)
assert.Nil(t, single)
assert.Empty(t, streaming)
assert.Empty(t, inputs)
})

t.Run("fail for invalid data shape", func(t *testing.T) {
Expand All @@ -56,22 +55,24 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

single, streaming, err := createPayloads(string(jsonData), mtdUnary)
inputs, err := createPayloads(string(jsonData), mtdUnary)
assert.Error(t, err)
assert.Nil(t, single)
assert.Nil(t, streaming)
assert.Nil(t, inputs)
})

// TODO: update tests below that comment

t.Run("create single object from map for unary", func(t *testing.T) {
m1 := make(map[string]interface{})
m1["name"] = "bob"

jsonData, _ := json.Marshal(m1)

single, streaming, err := createPayloads(string(jsonData), mtdUnary)
inputs, err := createPayloads(string(jsonData), mtdUnary)
assert.NoError(t, err)
assert.NotNil(t, single)
assert.Empty(t, streaming)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
})

t.Run("create array from map for client streaming", func(t *testing.T) {
Expand All @@ -80,11 +81,11 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

single, streaming, err := createPayloads(string(jsonData), mtdClientStreaming)
inputs, err := createPayloads(string(jsonData), mtdClientStreaming)
assert.NoError(t, err)
assert.Nil(t, single)
assert.NotNil(t, streaming)
assert.Len(t, *streaming, 1)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
})

t.Run("create slice of messages from slice for client streaming", func(t *testing.T) {
Expand All @@ -98,11 +99,10 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(s)

single, streaming, err := createPayloads(string(jsonData), mtdClientStreaming)
inputs, err := createPayloads(string(jsonData), mtdClientStreaming)
assert.NoError(t, err)
assert.Nil(t, single)
assert.NotNil(t, streaming)
assert.Len(t, *streaming, 2)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 2)
})

t.Run("fail on invalid shape of data in slice for client streaming", func(t *testing.T) {
Expand All @@ -120,10 +120,9 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(s)

single, streaming, err := createPayloads(string(jsonData), mtdClientStreaming)
inputs, err := createPayloads(string(jsonData), mtdClientStreaming)
assert.Error(t, err)
assert.Nil(t, single)
assert.Nil(t, streaming)
assert.Nil(t, inputs)
})

t.Run("get object for slice and unary", func(t *testing.T) {
Expand All @@ -140,10 +139,10 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(s)

single, streaming, err := createPayloads(string(jsonData), mtdUnary)
inputs, err := createPayloads(string(jsonData), mtdUnary)
assert.NoError(t, err)
assert.NotNil(t, single)
assert.Empty(t, streaming)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 3)
})

t.Run("create single object from map for unary with camelCase property", func(t *testing.T) {
Expand All @@ -152,10 +151,11 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

single, streaming, err := createPayloads(string(jsonData), mtdTestUnary)
inputs, err := createPayloads(string(jsonData), mtdTestUnary)
assert.NoError(t, err)
assert.NotNil(t, single)
assert.Empty(t, streaming)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
})

t.Run("create single object from map for unary with snake_case property", func(t *testing.T) {
Expand All @@ -164,10 +164,11 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

single, streaming, err := createPayloads(string(jsonData), mtdTestUnary)
inputs, err := createPayloads(string(jsonData), mtdTestUnary)
assert.NoError(t, err)
assert.NotNil(t, single)
assert.Empty(t, streaming)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
})

t.Run("create single object from map for unary with nested camelCase property", func(t *testing.T) {
Expand All @@ -179,10 +180,11 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

single, streaming, err := createPayloads(string(jsonData), mtdTestUnaryTwo)
inputs, err := createPayloads(string(jsonData), mtdTestUnaryTwo)
assert.NoError(t, err)
assert.NotNil(t, single)
assert.Empty(t, streaming)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
})

t.Run("create single object from map for unary with nested snake_case property", func(t *testing.T) {
Expand All @@ -194,9 +196,10 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

single, streaming, err := createPayloads(string(jsonData), mtdTestUnaryTwo)
inputs, err := createPayloads(string(jsonData), mtdTestUnaryTwo)
assert.NoError(t, err)
assert.NotNil(t, single)
assert.Empty(t, streaming)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
})
}
17 changes: 8 additions & 9 deletions runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"google.golang.org/grpc/metadata"
)

// Worker is used for doing a single stream of requests in parallerl
// Worker is used for doing a single stream of requests in parallel
type Worker struct {
stub grpcdynamic.Stub
mtd *desc.MethodDescriptor
Expand Down Expand Up @@ -57,21 +57,20 @@ func (w *Worker) makeRequest() error {

ctd := newCallTemplateData(w.mtd, w.workerID, reqNum)

var input *dynamic.Message
var streamInput *[]*dynamic.Message
var inputs *[]*dynamic.Message

if !w.config.binary {
data, err := ctd.executeData(string(w.config.data))
if err != nil {
return err
}
input, streamInput, err = createPayloads(string(data), w.mtd)
inputs, err = createPayloads(string(data), w.mtd)
if err != nil {
return err
}
} else {
var err error
input, streamInput, err = createPayloadsFromBin(w.config.data, w.mtd)
inputs, err = createPayloadsFromBin(w.config.data, w.mtd)
if err != nil {
return err
}
Expand Down Expand Up @@ -106,17 +105,17 @@ func (w *Worker) makeRequest() error {
// RPC errors are handled via stats handler

if w.mtd.IsClientStreaming() && w.mtd.IsServerStreaming() {
_ = w.makeBidiRequest(&ctx, streamInput)
_ = w.makeBidiRequest(&ctx, inputs)
}
if w.mtd.IsClientStreaming() {
_ = w.makeClientStreamingRequest(&ctx, streamInput)
_ = w.makeClientStreamingRequest(&ctx, inputs)
}
if w.mtd.IsServerStreaming() {
_ = w.makeServerStreamingRequest(&ctx, input)
_ = w.makeServerStreamingRequest(&ctx, (*inputs)[0])
}

// TODO: handle response?
_, _ = w.stub.InvokeRpc(ctx, w.mtd, input)
_, _ = w.stub.InvokeRpc(ctx, w.mtd, (*inputs)[0])
return err
}

Expand Down

0 comments on commit 83c25fe

Please sign in to comment.