Skip to content

Commit

Permalink
stream: make .destroy() interact better with write queue
Browse files Browse the repository at this point in the history
Make sure that it is safe to call the callback for `_write()`
even in the presence of `.destroy()` calls during that write.

In particular, letting the write queue continue processing would
previously have thrown an exception, because processing writes
after calling `.destroy()` is forbidden.

One test had to be modified to account for the fact that callbacks
for writes will now always be called, even when the stream
is destroyed during the process.

PR-URL: #24062
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
addaleax authored and MylesBorins committed Dec 26, 2018
1 parent 897114b commit 6ce4ef3
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ function onwrite(stream, er) {
onwriteError(stream, state, sync, er, cb);
else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state);
var finished = needFinish(state) || stream.destroyed;

if (!finished &&
!state.corked &&
Expand Down
59 changes: 59 additions & 0 deletions test/parallel/test-stream-write-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
'use strict';
require('../common');
const assert = require('assert');
const { Writable } = require('stream');

// Test interaction between calling .destroy() on a writable and pending
// writes.

for (const withPendingData of [ false, true ]) {
for (const useEnd of [ false, true ]) {
const callbacks = [];

const w = new Writable({
write(data, enc, cb) {
callbacks.push(cb);
},
// Effectively disable the HWM to observe 'drain' events more easily.
highWaterMark: 1
});

let chunksWritten = 0;
let drains = 0;
let finished = false;
w.on('drain', () => drains++);
w.on('finish', () => finished = true);

w.write('abc', () => chunksWritten++);
assert.strictEqual(chunksWritten, 0);
assert.strictEqual(drains, 0);
callbacks.shift()();
assert.strictEqual(chunksWritten, 1);
assert.strictEqual(drains, 1);

if (withPendingData) {
// Test 2 cases: There either is or is not data still in the write queue.
// (The second write will never actually get executed either way.)
w.write('def', () => chunksWritten++);
}
if (useEnd) {
// Again, test 2 cases: Either we indicate that we want to end the
// writable or not.
w.end('ghi', () => chunksWritten++);
} else {
w.write('ghi', () => chunksWritten++);
}

assert.strictEqual(chunksWritten, 1);
w.destroy();
assert.strictEqual(chunksWritten, 1);
callbacks.shift()();
assert.strictEqual(chunksWritten, 2);
assert.strictEqual(callbacks.length, 0);
assert.strictEqual(drains, 1);

// When we used `.end()`, we see the 'finished' event if and only if
// we actually finished processing the write queue.
assert.strictEqual(finished, !withPendingData && useEnd);
}
}

0 comments on commit 6ce4ef3

Please sign in to comment.