Skip to content

Commit

Permalink
stream: implement finished() for ReadableStream and WritableStream
Browse files Browse the repository at this point in the history
Refs: #39316
PR-URL: #46205
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Darshan Sen <[email protected]>
Reviewed-By: James M Snell <[email protected]>
debadree25 authored and RafaelGSS committed Jan 20, 2023

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 4f491d3 commit e5f53b5
Showing 5 changed files with 301 additions and 9 deletions.
25 changes: 20 additions & 5 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
@@ -22,20 +22,23 @@ const {
validateBoolean
} = require('internal/validators');

const { Promise } = primordials;
const { Promise, PromisePrototypeThen } = primordials;

const {
isClosed,
isReadable,
isReadableNodeStream,
isReadableStream,
isReadableFinished,
isReadableErrored,
isWritable,
isWritableNodeStream,
isWritableStream,
isWritableFinished,
isWritableErrored,
isNodeStream,
willEmitClose: _willEmitClose,
kIsClosedPromise,
} = require('internal/streams/utils');

function isRequest(stream) {
@@ -58,14 +61,17 @@ function eos(stream, options, callback) {

callback = once(callback);

const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);
if (isReadableStream(stream) || isWritableStream(stream)) {
return eosWeb(stream, options, callback);
}

if (!isNodeStream(stream)) {
// TODO: Webstreams.
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream);
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
}

const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);

const wState = stream._writableState;
const rState = stream._readableState;

@@ -255,6 +261,15 @@ function eos(stream, options, callback) {
return cleanup;
}

function eosWeb(stream, opts, callback) {
PromisePrototypeThen(
stream[kIsClosedPromise].promise,
() => process.nextTick(() => callback.call(stream)),
(err) => process.nextTick(() => callback.call(stream, err)),
);
return nop;
}

function finished(stream, opts) {
let autoCleanup = false;
if (opts === null) {
25 changes: 25 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
@@ -4,13 +4,16 @@ const {
Symbol,
SymbolAsyncIterator,
SymbolIterator,
SymbolFor,
} = primordials;

const kDestroyed = Symbol('kDestroyed');
const kIsErrored = Symbol('kIsErrored');
const kIsReadable = Symbol('kIsReadable');
const kIsDisturbed = Symbol('kIsDisturbed');

const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');

function isReadableNodeStream(obj, strict = false) {
return !!(
obj &&
@@ -55,6 +58,25 @@ function isNodeStream(obj) {
);
}

function isReadableStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.pipeThrough === 'function' &&
typeof obj.getReader === 'function' &&
typeof obj.cancel === 'function'
);
}

function isWritableStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.getWriter === 'function' &&
typeof obj.abort === 'function'
);
}

function isIterable(obj, isAsync) {
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
@@ -269,18 +291,21 @@ module.exports = {
kIsErrored,
isReadable,
kIsReadable,
kIsClosedPromise,
isClosed,
isDestroyed,
isDuplexNodeStream,
isFinished,
isIterable,
isReadableNodeStream,
isReadableStream,
isReadableEnded,
isReadableFinished,
isReadableErrored,
isNodeStream,
isWritable,
isWritableNodeStream,
isWritableStream,
isWritableEnded,
isWritableFinished,
isWritableErrored,
14 changes: 11 additions & 3 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
@@ -85,6 +85,7 @@ const {
kIsDisturbed,
kIsErrored,
kIsReadable,
kIsClosedPromise,
} = require('internal/streams/utils');

const {
@@ -258,9 +259,11 @@ class ReadableStream {
port1: undefined,
port2: undefined,
promise: undefined,
}
},
};

this[kIsClosedPromise] = createDeferredPromise();

// The spec requires handling of the strategy first
// here. Specifically, if getting the size and
// highWaterMark from the strategy fail, that has
@@ -652,8 +655,9 @@ function TransferredReadableStream() {
writable: undefined,
port: undefined,
promise: undefined,
}
},
};
this[kIsClosedPromise] = createDeferredPromise();
},
[], ReadableStream));
}
@@ -1213,8 +1217,9 @@ function createTeeReadableStream(start, pull, cancel) {
writable: undefined,
port: undefined,
promise: undefined,
}
},
};
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
this,
ObjectCreate(null, {
@@ -1887,6 +1892,7 @@ function readableStreamCancel(stream, reason) {
function readableStreamClose(stream) {
assert(stream[kState].state === 'readable');
stream[kState].state = 'closed';
stream[kIsClosedPromise].resolve();

const {
reader,
@@ -1908,6 +1914,8 @@ function readableStreamError(stream, error) {
assert(stream[kState].state === 'readable');
stream[kState].state = 'errored';
stream[kState].storedError = error;
stream[kIsClosedPromise].reject(error);
setPromiseHandled(stream[kIsClosedPromise].promise);

const {
reader
14 changes: 13 additions & 1 deletion lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
@@ -69,6 +69,10 @@ const {
kState,
} = require('internal/webstreams/util');

const {
kIsClosedPromise,
} = require('internal/streams/utils');

const {
AbortController,
} = require('internal/abort_controller');
@@ -191,9 +195,11 @@ class WritableStream {
port1: undefined,
port2: undefined,
promise: undefined,
}
},
};

this[kIsClosedPromise] = createDeferredPromise();

const size = extractSizeAlgorithm(strategy?.size);
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);

@@ -363,6 +369,7 @@ function TransferredWritableStream() {
readable: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
},
[], WritableStream));
}
@@ -742,6 +749,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
resolve: undefined,
};
}

stream[kIsClosedPromise].reject(stream[kState]?.storedError);
setPromiseHandled(stream[kIsClosedPromise].promise);

const {
writer,
} = stream[kState];
@@ -855,6 +866,7 @@ function writableStreamFinishInFlightClose(stream) {
stream[kState].state = 'closed';
if (stream[kState].writer !== undefined)
stream[kState].writer[kState].close.resolve?.();
stream[kIsClosedPromise].resolve?.();
assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
assert(stream[kState].storedError === undefined);
}
232 changes: 232 additions & 0 deletions test/parallel/test-webstreams-finished.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { ReadableStream, WritableStream } = require('stream/web');
const { finished } = require('stream');
const { finished: finishedPromise } = require('stream/promises');

{
const rs = new ReadableStream({
start(controller) {
controller.enqueue('asd');
controller.close();
},
});
finished(rs, common.mustSucceed());
async function test() {
const values = [];
for await (const chunk of rs) {
values.push(chunk);
}
assert.deepStrictEqual(values, ['asd']);
}
test();
}

{
const rs = new ReadableStream({
start(controller) {
controller.error(new Error('asd'));
}
});

finished(rs, common.mustCall((err) => {
assert.strictEqual(err?.message, 'asd');
}));
}

{
const rs = new ReadableStream({
async start(controller) {
throw new Error('asd');
}
});

finished(rs, common.mustCall((err) => {
assert.strictEqual(err?.message, 'asd');
}));
}

{
const rs = new ReadableStream({
start(controller) {
controller.enqueue('asd');
controller.close();
}
});

async function test() {
const values = [];
for await (const chunk of rs) {
values.push(chunk);
}
assert.deepStrictEqual(values, ['asd']);
}

finishedPromise(rs).then(common.mustSucceed());

test();
}

{
const rs = new ReadableStream({
start(controller) {
controller.error(new Error('asd'));
}
});

finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => {
assert.strictEqual(err?.message, 'asd');
}));
}

{
const rs = new ReadableStream({
async start(controller) {
throw new Error('asd');
}
});

finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => {
assert.strictEqual(err?.message, 'asd');
}));
}

{
const rs = new ReadableStream({
start(controller) {
controller.enqueue('asd');
controller.close();
}
});

const { 0: s1, 1: s2 } = rs.tee();

finished(s1, common.mustSucceed());
finished(s2, common.mustSucceed());

async function test(stream) {
const values = [];
for await (const chunk of stream) {
values.push(chunk);
}
assert.deepStrictEqual(values, ['asd']);
}

Promise.all([
test(s1),
test(s2),
]).then(common.mustCall());
}

{
const rs = new ReadableStream({
start(controller) {
controller.error(new Error('asd'));
}
});

const { 0: s1, 1: s2 } = rs.tee();

finished(s1, common.mustCall((err) => {
assert.strictEqual(err?.message, 'asd');
}));

finished(s2, common.mustCall((err) => {
assert.strictEqual(err?.message, 'asd');
}));
}

{
const rs = new ReadableStream({
start(controller) {
controller.enqueue('asd');
controller.close();
}
});

finished(rs, common.mustSucceed());

rs.cancel();
}

{
let str = '';
const ws = new WritableStream({
write(chunk) {
str += chunk;
}
});

finished(ws, common.mustSucceed(() => {
assert.strictEqual(str, 'asd');
}));

const writer = ws.getWriter();
writer.write('asd');
writer.close();
}

{
const ws = new WritableStream({
async write(chunk) {
throw new Error('asd');
}
});

finished(ws, common.mustCall((err) => {
assert.strictEqual(err?.message, 'asd');
}));

const writer = ws.getWriter();
writer.write('asd').catch((err) => {
assert.strictEqual(err?.message, 'asd');
});
}

{
let str = '';
const ws = new WritableStream({
write(chunk) {
str += chunk;
}
});

finishedPromise(ws).then(common.mustSucceed(() => {
assert.strictEqual(str, 'asd');
}));

const writer = ws.getWriter();
writer.write('asd');
writer.close();
}

{
const ws = new WritableStream({
write(chunk) { }
});
finished(ws, common.mustCall((err) => {
assert.strictEqual(err?.message, 'asd');
}));

const writer = ws.getWriter();
writer.abort(new Error('asd'));
}

{
const ws = new WritableStream({
async write(chunk) {
throw new Error('asd');
}
});

finishedPromise(ws).then(common.mustNotCall()).catch(common.mustCall((err) => {
assert.strictEqual(err?.message, 'asd');
}));

const writer = ws.getWriter();
writer.write('asd').catch((err) => {
assert.strictEqual(err?.message, 'asd');
});
}

0 comments on commit e5f53b5

Please sign in to comment.