Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix child process stdio data loss with slow piped consumers #7185

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,11 @@ util.inherits(ChildProcess, EventEmitter);
function flushStdio(subprocess) {
if (subprocess.stdio == null) return;
subprocess.stdio.forEach(function(stream, fd, stdio) {
if (!stream || !stream.readable || stream._readableState.readableListening)
if (!stream || !stream.readable ||
stream._readableState.readableListening ||
stream._readableState.pipesCount > 0) {
return;
}
stream.resume();
});
}
Expand Down
39 changes: 36 additions & 3 deletions test/parallel/test-child-process-flush-stdio.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
const cp = require('child_process');
const common = require('../common');
const assert = require('assert');
const stream = require('stream');

const p = cp.spawn('echo');

Expand All @@ -20,10 +21,42 @@ function spawnWithReadable() {
assert.strictEqual(code, 0);
assert.strictEqual(signal, null);
assert.strictEqual(Buffer.concat(buffer).toString().trim(), '123');
spawnWithPipe();
}));
p.stdout.on('readable', function() {
let buf;
while (buf = this.read())
buffer.push(buf);
setImmediate(() => {
let buf;
while (buf = this.read())
buffer.push(buf);
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This testcase was added to cover the issue fixed here: balena-io-experimental@12274a5

But the testcase passes even if the change of that commit is reverted. So I just fixed the testcase to reflect what was actually fixed there.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will PR this separately

Copy link
Member

@addaleax addaleax Jun 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@petrosagg Can confirm that this test case worked even before 12274a5 – nice catch! A separate PR seems like a good idea, yep

});
}

function spawnWithPipe() {
const buffer = [];
const through = new stream.PassThrough();
const p = cp.spawn('seq', [ '36000' ]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something more cross-platform than seq might be nice here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I can PR just the testcase separately although I'm not sure if it adds any value since the issue turned out to be in the stream implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@petrosagg Too many tests never hurt anyone, so again thumbs up for a separate PR. ;)


const ret = [];
for (let i = 1; i <= 36000; i++) {
ret.push(i);
}

p.on('close', common.mustCall(function(code, signal) {
assert.strictEqual(code, 0);
assert.strictEqual(signal, null);
assert.strictEqual(Buffer.concat(buffer).toString().trim(), ret.join('\n'));
}));

p.on('exit', common.mustCall(function() {
Copy link
Member

@addaleax addaleax Jun 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should read close, as there’s currently no guarantee that 'close' will be emitted before 'exit'.

setImmediate(function() {
through.on('readable', function() {
let buf;
while (buf = this.read())
buffer.push(buf);
});
});
}));

p.stdout.pipe(through);
}