Skip to content

Commit

Permalink
Merge pull request #87 from criteo-forks/varied-data
Browse files Browse the repository at this point in the history
Different data for unary calls
  • Loading branch information
bojand authored Apr 25, 2019
2 parents ba7c451 + de50c21 commit d763723
Show file tree
Hide file tree
Showing 14 changed files with 436 additions and 132 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ build
node_modules
.cache
coverage.html

.idea
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ Flags:
--connections=1 Number of connections to use. Concurrency is distributed evenly among all the connections. Default is 1.
-d, --data= The call data as stringified JSON. If the value is '@' then the request contents are read from stdin.
-D, --data-file= File path for call data JSON file. Examples: /home/user/file.json or ./file.json.
-b, --binary The call data comes as serialized binary message read from stdin.
-B, --binary-file= File path for the call data as serialized binary message.
-b, --binary The call data comes as serialized binary message or multiple count-prefixed messages read from stdin.
-B, --binary-file= File path for the call data as serialized binary message or multiple count-prefixed messages.
-m, --metadata= Request metadata as stringified JSON.
-M, --metadata-file= File path for call metadata JSON file. Examples: /home/user/metadata.json or ./metadata.json.
--stream-interval=0 Interval for stream requests between message sends.
Expand Down
4 changes: 2 additions & 2 deletions cmd/ghz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ var (

data = kingpin.Flag("data", "The call data as stringified JSON. If the value is '@' then the request contents are read from stdin.").Short('d').PlaceHolder(" ").String()
dataPath = kingpin.Flag("data-file", "File path for call data JSON file. Examples: /home/user/file.json or ./file.json.").Short('D').PlaceHolder("PATH").PlaceHolder(" ").String()
binData = kingpin.Flag("binary", "The call data comes as serialized binary message read from stdin.").Short('b').Default("false").Bool()
binPath = kingpin.Flag("binary-file", "File path for the call data as serialized binary message.").Short('B').PlaceHolder(" ").String()
binData = kingpin.Flag("binary", "The call data comes as serialized binary message or multiple count-prefixed messages read from stdin.").Short('b').Default("false").Bool()
binPath = kingpin.Flag("binary-file", "File path for the call data as serialized binary message or multiple count-prefixed messages.").Short('B').PlaceHolder(" ").String()
md = kingpin.Flag("metadata", "Request metadata as stringified JSON.").Short('m').PlaceHolder(" ").String()
mdPath = kingpin.Flag("metadata-file", "File path for call metadata JSON file. Examples: /home/user/metadata.json or ./metadata.json.").Short('M').PlaceHolder(" ").String()
si = kingpin.Flag("stream-interval", "Interval for stream requests between message sends.").Default("0").Duration()
Expand Down
90 changes: 69 additions & 21 deletions internal/helloworld/greeter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package helloworld
import (
"fmt"
"io"
"math/rand"
"sync"
"time"

context "golang.org/x/net/context"
"google.golang.org/grpc/stats"
Expand All @@ -13,7 +15,7 @@ import (
// unary, client streaming, server streaming, bidi
type CallType string

// Unary is a uniry call
// Unary is a unary call
var Unary CallType = "unary"

// ClientStream is a client streaming call
Expand All @@ -33,22 +35,48 @@ type Greeter struct {

mutex *sync.RWMutex
callCounts map[CallType]int
calls map[CallType][][]*HelloRequest
}

func randomSleep() {
msCount := rand.Intn(4) + 1
time.Sleep(time.Millisecond * time.Duration(msCount))
}

func (s *Greeter) recordCall(ct CallType) int {
s.mutex.Lock()
defer s.mutex.Unlock()

s.callCounts[ct]++
var messages []*HelloRequest
s.calls[ct] = append(s.calls[ct], messages)

return len(s.calls[ct]) - 1
}

func (s *Greeter) recordMessage(ct CallType, callIdx int, msg *HelloRequest) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.calls[ct][callIdx] = append(s.calls[ct][callIdx], msg)
}

// SayHello implements helloworld.GreeterServer
func (s *Greeter) SayHello(ctx context.Context, in *HelloRequest) (*HelloReply, error) {
s.mutex.Lock()
s.callCounts[Unary]++
s.mutex.Unlock()
callIdx := s.recordCall(Unary)
s.recordMessage(Unary, callIdx, in)

randomSleep()

return &HelloReply{Message: "Hello " + in.Name}, nil
}

// SayHellos lists all hellos
func (s *Greeter) SayHellos(req *HelloRequest, stream Greeter_SayHellosServer) error {
s.mutex.Lock()
s.callCounts[ServerStream]++
s.mutex.Unlock()
callIdx := s.recordCall(ServerStream)
s.recordMessage(ServerStream, callIdx, req)

randomSleep()

for _, msg := range s.streamData {
if err := stream.Send(msg); err != nil {
Expand All @@ -61,30 +89,31 @@ func (s *Greeter) SayHellos(req *HelloRequest, stream Greeter_SayHellosServer) e

// SayHelloCS is client streaming handler
func (s *Greeter) SayHelloCS(stream Greeter_SayHelloCSServer) error {
s.mutex.Lock()
s.callCounts[ClientStream]++
s.mutex.Unlock()
callIdx := s.recordCall(ClientStream)

randomSleep()

msgCount := 0

for {
_, err := stream.Recv()
in, err := stream.Recv()
if err == io.EOF {
msgStr := fmt.Sprintf("Hello count: %d", msgCount)
return stream.SendAndClose(&HelloReply{Message: msgStr})
}
if err != nil {
return err
}
s.recordMessage(ClientStream, callIdx, in)
msgCount++
}
}

// SayHelloBidi duplex call handler
func (s *Greeter) SayHelloBidi(stream Greeter_SayHelloBidiServer) error {
s.mutex.Lock()
s.callCounts[Bidi]++
s.mutex.Unlock()
callIdx := s.recordCall(Bidi)

randomSleep()

for {
in, err := stream.Recv()
Expand All @@ -95,6 +124,7 @@ func (s *Greeter) SayHelloBidi(stream Greeter_SayHelloBidiServer) error {
return err
}

s.recordMessage(Bidi, callIdx, in)
msg := "Hello " + in.Name
if err := stream.Send(&HelloReply{Message: msg}); err != nil {
return err
Expand All @@ -105,10 +135,19 @@ func (s *Greeter) SayHelloBidi(stream Greeter_SayHelloBidiServer) error {
// ResetCounters resets the call counts
func (s *Greeter) ResetCounters() {
s.mutex.Lock()

s.callCounts = make(map[CallType]int)
s.callCounts[Unary] = 0
s.callCounts[ServerStream] = 0
s.callCounts[ClientStream] = 0
s.callCounts[Bidi] = 0

s.calls = make(map[CallType][][]*HelloRequest)
s.calls[Unary] = make([][]*HelloRequest, 0)
s.calls[ServerStream] = make([][]*HelloRequest, 0)
s.calls[ClientStream] = make([][]*HelloRequest, 0)
s.calls[Bidi] = make([][]*HelloRequest, 0)

s.mutex.Unlock()

if s.Stats != nil {
Expand All @@ -129,6 +168,18 @@ func (s *Greeter) GetCount(key CallType) int {
return -1
}

// GetCalls gets the received messages for specific call type
func (s *Greeter) GetCalls(key CallType) [][]*HelloRequest {
s.mutex.Lock()
val, ok := s.calls[key]
s.mutex.Unlock()

if ok {
return val
}
return nil
}

// GetConnectionCount gets the connection count
func (s *Greeter) GetConnectionCount() int {
return s.Stats.GetConnectionCount()
Expand All @@ -143,13 +194,10 @@ func NewGreeter() *Greeter {
&HelloReply{Message: "Hello Sara"},
}

m := make(map[CallType]int)
m[Unary] = 0
m[ServerStream] = 0
m[ClientStream] = 0
m[Bidi] = 0
greeter := &Greeter{streamData: streamData, mutex: &sync.RWMutex{}}
greeter.ResetCounters()

return &Greeter{streamData: streamData, callCounts: m, mutex: &sync.RWMutex{}}
return greeter
}

// NewHWStats creates new stats handler
Expand Down Expand Up @@ -197,4 +245,4 @@ func (c *HWStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
// TagRPC implements per-RPC context management.
func (c *HWStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}
}
2 changes: 1 addition & 1 deletion runner/call_template_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// call template data
type callTemplateData struct {
WorkerID string // unique worker ID
RequestNumber int64 // unique incrememnted request number for each request
RequestNumber int64 // unique incremented request number for each request
FullyQualifiedName string // fully-qualified name of the method call
MethodName string // shorter call method name
ServiceName string // the service name
Expand Down
93 changes: 62 additions & 31 deletions runner/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runner
import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/golang/protobuf/jsonpb"
Expand All @@ -12,8 +13,8 @@ import (
)

// creates a message from a map
// marshal to JSON then use jsonb to marshal to message
// this way we follow protobuf more closely and allow cammelCase properties.
// marshal to JSON then use jsonpb to marshal to message
// this way we follow protobuf more closely and allow camelCase properties.
func messageFromMap(input *dynamic.Message, data *map[string]interface{}) error {
strData, err := json.Marshal(data)
if err != nil {
Expand All @@ -28,71 +29,101 @@ func messageFromMap(input *dynamic.Message, data *map[string]interface{}) error
return nil
}

func createPayloads(data string, mtd *desc.MethodDescriptor) (*dynamic.Message, *[]*dynamic.Message, error) {
func createPayloadsFromJSON(data string, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
md := mtd.GetInputType()
var input *dynamic.Message
var streamInput []*dynamic.Message
var inputs []*dynamic.Message

if len(data) > 0 {
if strings.IndexRune(data, '[') == 0 {
dataArray := make([]map[string]interface{}, 5)
err := json.Unmarshal([]byte(data), &dataArray)
if err != nil {
return nil, nil, fmt.Errorf("Error unmarshalling payload. Data: '%v' Error: %v", data, err.Error())
return nil, fmt.Errorf("Error unmarshalling payload. Data: '%v' Error: %v", data, err.Error())
}

elems := len(dataArray)
if elems > 0 {
streamInput = make([]*dynamic.Message, elems)
inputs = make([]*dynamic.Message, elems)
}

for i, elem := range dataArray {
elemMsg := dynamic.NewMessage(md)
err := messageFromMap(elemMsg, &elem)
if err != nil {
return nil, nil, fmt.Errorf("Error creating message: %v", err.Error())
return nil, fmt.Errorf("Error creating message: %v", err.Error())
}

streamInput[i] = elemMsg
inputs[i] = elemMsg
}
} else {
input = dynamic.NewMessage(md)
err := jsonpb.UnmarshalString(data, input)
inputs = make([]*dynamic.Message, 1)
inputs[0] = dynamic.NewMessage(md)
err := jsonpb.UnmarshalString(data, inputs[0])
if err != nil {
return nil, nil, fmt.Errorf("Error creating message from data. Data: '%v' Error: %v", data, err.Error())
return nil, fmt.Errorf("Error creating message from data. Data: '%v' Error: %v", data, err.Error())
}
}
}

if mtd.IsClientStreaming() && len(streamInput) == 0 && input != nil {
streamInput = make([]*dynamic.Message, 1)
streamInput[0] = input
input = nil
return &inputs, nil
}

func createPayloadsFromBinSingleMessage(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
var inputs []*dynamic.Message
md := mtd.GetInputType()

// return empty array if no data
if len(binData) == 0 {
return &inputs, nil
}

if !mtd.IsClientStreaming() && input == nil && len(streamInput) > 0 {
input = streamInput[0]
streamInput = nil
// try to unmarshal input as a single message
singleMessage := dynamic.NewMessage(md)
err := proto.Unmarshal(binData, singleMessage)
if err != nil {
return nil, fmt.Errorf("Error creating message from binary data: %v", err.Error())
}
inputs = make([]*dynamic.Message, 1)
inputs[0] = singleMessage

return input, &streamInput, nil
return &inputs, nil
}

func createPayloadsFromBin(binData []byte, mtd *desc.MethodDescriptor) (*dynamic.Message, *[]*dynamic.Message, error) {
func createPayloadsFromBinCountDelimited(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
var inputs []*dynamic.Message
md := mtd.GetInputType()
input := dynamic.NewMessage(md)
streamInput := make([]*dynamic.Message, 1)

err := proto.Unmarshal(binData, input)
if err != nil {
return nil, nil, fmt.Errorf("Error creating message from binary data: %v", err.Error())
// return empty array if no data
if len(binData) == 0 {
return &inputs, nil
}

if mtd.IsClientStreaming() && input != nil {
streamInput = make([]*dynamic.Message, 1)
streamInput[0] = input
input = nil
// try to unmarshal input as several count-delimited messages
buffer := proto.NewBuffer(binData)
for {
msg := dynamic.NewMessage(md)
err := buffer.DecodeMessage(msg)

if err == io.ErrUnexpectedEOF {
break
}

if err != nil {
return nil, fmt.Errorf("Error creating message from binary data: %v", err.Error())
}

inputs = append(inputs, msg)
}

return input, &streamInput, nil
return &inputs, nil
}

func createPayloadsFromBin(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
inputs, err := createPayloadsFromBinCountDelimited(binData, mtd)

if err == nil && len(*inputs) > 0 {
return inputs, err
}

return createPayloadsFromBinSingleMessage(binData, mtd)
}
Loading

0 comments on commit d763723

Please sign in to comment.