Skip to content

Commit

Permalink
Make receiving from bidirectional streams asynchronous to keep reques…
Browse files Browse the repository at this point in the history
…ts flowing
  • Loading branch information
Dariusz Jedrzejczyk committed Mar 15, 2019
1 parent 7cd212e commit 9ee88b1
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions runner/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,17 +400,38 @@ func (b *Requester) makeServerStreamingRequest(ctx *context.Context, input *dyna

func (b *Requester) makeBidiRequest(ctx *context.Context, input *[]*dynamic.Message) error {
str, err := b.stub.InvokeRpcBidiStream(*ctx, b.mtd)
if err != nil {
fmt.Printf("Stream creation failed: %+v\n", err)
return err
}
counter := 0
// TODO: need to handle and propagate errors
for err == nil {
streamInput := *input
inputLen := len(streamInput)
if input == nil || inputLen == 0 {
// TODO: need to handle error
_ = str.CloseSend()
break

streamInput := *input
inputLen := len(streamInput)

recvDone := make(chan bool)

if input == nil || inputLen == 0 {
// TODO: need to handle error
_ = str.CloseSend()
return nil
}

go func() {
for {
_, err := str.RecvMsg()
if err != nil && err != io.EOF {
fmt.Printf("error receiving: %+v\n", err)
}
if err != nil {
close(recvDone)
return
}
}
}()

// TODO: need to handle and propagate errors
for err == nil {
if counter == inputLen {
// TODO: need to handle error
_ = str.CloseSend()
Expand All @@ -428,14 +449,8 @@ func (b *Requester) makeBidiRequest(ctx *context.Context, input *[]*dynamic.Mess
counter++
}

for err == nil {
_, err := str.RecvMsg()
if err != nil {
if err == io.EOF {
err = nil
}
break
}
if err == nil {
<-recvDone
}
return nil
}
Expand Down

0 comments on commit 9ee88b1

Please sign in to comment.