Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Avoid closing the stream if server still open #47

Merged
merged 5 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions examples/grpc_client_streaming.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ export default () => {

// close the client stream
stream.end();

sleep(1);
};

const pointSender = (stream, point) => {
Expand Down
2 changes: 1 addition & 1 deletion grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (mi *ModuleInstance) stream(c goja.ConstructorCall) *goja.Object {
instanceMetrics: mi.metrics,
builtinMetrics: mi.vu.State().BuiltinMetrics,
done: make(chan struct{}),
state: opened,
writingState: opened,

writeQueueCh: make(chan message),

Expand Down
40 changes: 25 additions & 15 deletions grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type message struct {

const (
opened = iota + 1
closing
closed
)

Expand All @@ -51,8 +50,8 @@ type stream struct {

obj *goja.Object // the object that is given to js to interact with the stream

state int8
done chan struct{}
writingState int8
done chan struct{}

writeQueueCh chan message

Expand Down Expand Up @@ -185,17 +184,19 @@ func (s *stream) readData(wg *sync.WaitGroup) {
return
}

if len(msg) == 0 && isRegularClosing(err) {
if len(msg) > 0 {
s.queueMessage(msg)
}

if isRegularClosing(err) {
s.logger.WithError(err).Debug("stream is cancelled/finished")

s.tq.Queue(func() error {
return s.closeWithError(nil)
return s.closeWithError(err)
})

return
}

s.queueMessage(msg)
}
}

Expand Down Expand Up @@ -299,7 +300,7 @@ func (s *stream) on(event string, listener func(goja.Value) (goja.Value, error))

// write writes a message to the stream
func (s *stream) write(input goja.Value) {
if s.state != opened {
if s.writingState != opened {
return
}

Expand All @@ -320,11 +321,13 @@ func (s *stream) write(input goja.Value) {

// end closes client the stream
func (s *stream) end() {
if s.state == closed || s.state == closing {
if s.writingState == closed {
return
}

s.state = closing
s.logger.Debugf("finishing stream %s writing", s.method)

s.writingState = closed
s.writeQueueCh <- message{isClosing: true}
}

Expand All @@ -334,16 +337,23 @@ func (s *stream) closeWithError(err error) error {
return s.callErrorListeners(err)
}

// close changes the stream state to closed and triggers the end event listeners
// close closes the stream and call end event listeners
// Note: in the regular closing the io.EOF could come
func (s *stream) close(err error) {
if s.state == closed {
if err == nil {
mstoykov marked this conversation as resolved.
Show resolved Hide resolved
return
}

s.logger.WithError(err).Debug("stream is closing")
select {
case <-s.done:
s.logger.Debugf("stream %v is already closed", s.method)
return
default:
}

s.state = closed
s.logger.Debugf("stream %s is closing", s.method)
close(s.done)

s.tq.Queue(func() error {
return s.callEventListeners(eventEnd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a separate event for the stream "ending" in one direction but not the other?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not sure if I'm getting what is the concern here 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this events measn the whole stream has ended - and will in prtactice be called once. But now it either will mean "the writing direction is closed /ended" or will mean "either the writing or reading direction has ended".

Given that it seems like it will be beneficial to close one or the other and continue getting other/data events- it seems beneficial to also be able ot recognise when one or the other direction has ended.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this events measn the whole stream has ended - and will in prtactice be called once. But now it either will mean "the writing direction is closed /ended" or will mean "either the writing or reading direction has ended".

But it's not. The behavior remains. The stream.on('end', () => {}) will be triggered only once, when the stream is closed (for both writing and reading).

And I see no need for ending event since that moment happens exact after calling stream.end()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean, but I think the bigger problem is that I personally was mistaken in my understanding of how this is suppsoed to work.

Having taken some time to look at some nodejs exampels it seems to me like .end() the method only closes the writing part and the end event only signals closing the reading part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been trying to figure out what happens if you try to write to a stream that that was readonly ... and ... I am not certain what happens as I feel the current examples are not complicated enough :(

And I have been trying to not try to write completely new examples just to get them wrong :(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it isn't a good idea to look at the nodejs's docs/examples to get an understanding of how our implementation works 🤷 .

Yes, we have a kind of similar API, but it's not the same and in our docs, we're pretty explicit on how it is supposed to work. Like stream.end signals that the client finished the sending and stream.on('end', () => {}) happens when the stream closes.

And I believe our current behavior on's end is better in k6's context since it could help our users to put some checks there since we declare that this is the moment when the stream finished and everything that should happen is happen or not 😅

I have been trying to figure out what happens if you try to write to a stream that that was readonly ... and ... I am not certain what happens as I feel the current examples are not complicated enough :(

I might think that here you mean two different cases:

The first case is about the stream being always read-only (a.k.a server-side stream). In that case, we could learn it from the definitions, I believe, and just return trigger an error (or write a log that there was an attempt to write to read-only stream).

The second case is more interesting 🤔 , but I'm not sure that it is realistic. It's the case when the server finishes the stream while the client still wants to send data. On the other side, I believe even in that case, the last word is on the server to report that it's all done (io.EOF in the read from server part).

})
Expand All @@ -354,7 +364,7 @@ func (s *stream) close(err error) {
}

func (s *stream) callErrorListeners(e error) error {
if e == nil {
if e == nil || errors.Is(e, io.EOF) {
return nil
}

Expand Down
96 changes: 96 additions & 0 deletions grpc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"context"
"testing"
"time"

"github.com/dop251/goja"
"github.com/grafana/xk6-grpc/grpc/testutils/grpcservice"
Expand Down Expand Up @@ -188,6 +189,101 @@ func TestStream_ErrorHandling(t *testing.T) {
)
}

// this test case is checking that everything that server sends
// after the client finished (client.end called) is delivered to the client
// and the end event is called
func TestStream_ReceiveAllServerResponsesAfterEnd(t *testing.T) {
t.Parallel()

ts := newTestState(t)

stub := &FeatureExplorerStub{}

savedFeatures := []*grpcservice.Feature{
{
Name: "foo",
Location: &grpcservice.Point{
Latitude: 1,
Longitude: 2,
},
},
{
Name: "bar",
Location: &grpcservice.Point{
Latitude: 3,
Longitude: 4,
},
},
}

stub.listFeatures = func(rect *grpcservice.Rectangle, stream grpcservice.FeatureExplorer_ListFeaturesServer) error {
for _, feature := range savedFeatures {
// adding a delay to make server response "slower"
time.Sleep(200 * time.Millisecond)

if err := stream.Send(feature); err != nil {
return err
}
}

return nil
}

grpcservice.RegisterFeatureExplorerServer(ts.httpBin.ServerGRPC, stub)

replace := func(code string) (goja.Value, error) {
olegbespalov marked this conversation as resolved.
Show resolved Hide resolved
return ts.VU.Runtime().RunString(ts.httpBin.Replacer.Replace(code))
}

initString := codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../grpc/testutils/grpcservice/route_guide.proto");`,
}
vuString := codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
let stream = new grpc.Stream(client, "main.FeatureExplorer/ListFeatures")
stream.on('data', function (data) {
call('Feature:' + data.name);
});
stream.on('end', function () {
call('End called');
});
stream.write({
lo: {
latitude: 1,
longitude: 2,
},
hi: {
latitude: 1,
longitude: 2,
},
});
stream.end();
`,
}

val, err := replace(initString.code)
assertResponse(t, initString, err, val, ts)

ts.ToVUContext()

val, err = replace(vuString.code)

ts.EventLoop.WaitOnRegistered()

assertResponse(t, vuString, err, val, ts)

assert.Equal(t, ts.callRecorder.Recorded(), []string{
"Feature:foo",
"Feature:bar",
"End called",
},
)
}

// FeatureExplorerStub is a stub for FeatureExplorerServer
// it has ability to override methods
type FeatureExplorerStub struct {
Expand Down
Loading