diff --git a/runner/requester.go b/runner/requester.go index 2fd026d8..ee23772e 100644 --- a/runner/requester.go +++ b/runner/requester.go @@ -400,17 +400,38 @@ func (b *Requester) makeServerStreamingRequest(ctx *context.Context, input *dyna func (b *Requester) makeBidiRequest(ctx *context.Context, input *[]*dynamic.Message) error { str, err := b.stub.InvokeRpcBidiStream(*ctx, b.mtd) + if err != nil { + fmt.Printf("Stream creation failed: %+v\n", err) + return err + } counter := 0 - // TODO: need to handle and propagate errors - for err == nil { - streamInput := *input - inputLen := len(streamInput) - if input == nil || inputLen == 0 { - // TODO: need to handle error - _ = str.CloseSend() - break + + streamInput := *input + inputLen := len(streamInput) + + recvDone := make(chan bool) + + if input == nil || inputLen == 0 { + // TODO: need to handle error + _ = str.CloseSend() + return nil + } + + go func() { + for { + _, err := str.RecvMsg() + if err != nil && err != io.EOF { + fmt.Printf("error receiving: %+v\n", err) + } + if err != nil { + close(recvDone) + return + } } + }() + // TODO: need to handle and propagate errors + for err == nil { if counter == inputLen { // TODO: need to handle error _ = str.CloseSend() @@ -428,14 +449,8 @@ func (b *Requester) makeBidiRequest(ctx *context.Context, input *[]*dynamic.Mess counter++ } - for err == nil { - _, err := str.RecvMsg() - if err != nil { - if err == io.EOF { - err = nil - } - break - } + if err == nil { + <-recvDone } return nil }