diff --git a/index.bs b/index.bs index d9db80921..542556f4e 100644 --- a/index.bs +++ b/index.bs @@ -192,7 +192,7 @@ longer active. At this point another reader can be acquired at will. the chain:
- readableStream.pipeTo(writableStream)
+ readableStream.pipeTo(writableStream).finished
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
@@ -924,7 +924,7 @@ a variable stream, that performs the following steps:
chance to slow down its data production.
- readableStream.pipeTo(writableStream)
+ readableStream.pipeTo(writableStream).finished
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
@@ -1839,7 +1839,7 @@ writable stream:
const webSocketStream = makeReadableWebSocketStream("wss://example.com", 443);
- webSocketStream.pipeTo(writableStream)
+ webSocketStream.pipeTo(writableStream).finished
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
@@ -1972,7 +1972,7 @@ We can then use this function to create writable streams for a web socket, and p
const webSocketStream = makeWritableWebSocketStream("wss://example.com", 443);
- readableStream.pipeTo(webSocketStream)
+ readableStream.pipeTo(webSocketStream).finished
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js
index 5ffb4720b..16b324a0e 100644
--- a/reference-implementation/lib/readable-stream.js
+++ b/reference-implementation/lib/readable-stream.js
@@ -72,12 +72,13 @@ export default class ReadableStream {
let reader;
let lastRead;
let closedPurposefully = false;
- let resolvePipeToPromise;
- let rejectPipeToPromise;
+ let unpiped = false;
+ let resolveFinishedPromise;
+ let rejectFinishedPromise;
- return new Promise((resolve, reject) => {
- resolvePipeToPromise = resolve;
- rejectPipeToPromise = reject;
+ const finishedPromise = new Promise((resolve, reject) => {
+ resolveFinishedPromise = resolve;
+ rejectFinishedPromise = reject;
reader = source.getReader();
@@ -94,9 +95,25 @@ export default class ReadableStream {
doPipe();
});
+ return { finished: finishedPromise, unpipe };
+
+ function unpipe() {
+ unpiped = true;
+ return lastRead.then(finishUnpipe, finishUnpipe);
+
+ function finishUnpipe() {
+ reader.releaseLock();
+ resolveFinishedPromise(undefined);
+ }
+ }
+
function doPipe() {
lastRead = reader.read();
Promise.all([lastRead, dest.ready]).then(([{ value, done }]) => {
+ if (unpiped === true) {
+ return;
+ }
+
if (Boolean(done) === true) {
closeDest();
} else if (dest.state === 'writable') {
@@ -110,22 +127,30 @@ export default class ReadableStream {
}
function cancelSource(reason) {
+ if (unpiped === true) {
+ return;
+ }
+
if (preventCancel === false) {
reader.cancel(reason);
reader.releaseLock();
- rejectPipeToPromise(reason);
+ rejectFinishedPromise(reason);
} else {
// If we don't cancel, we need to wait for lastRead to finish before we're allowed to release.
// We don't need to handle lastRead failing because that will trigger abortDest which takes care of
// both of these.
lastRead.then(() => {
reader.releaseLock();
- rejectPipeToPromise(reason);
+ rejectFinishedPromise(reason);
});
}
}
function closeDest() {
+ if (unpiped === true) {
+ return;
+ }
+
// Does not need to wait for lastRead since it occurs only on source closed.
reader.releaseLock();
@@ -133,13 +158,17 @@ export default class ReadableStream {
const destState = dest.state;
if (preventClose === false && (destState === 'waiting' || destState === 'writable')) {
closedPurposefully = true;
- dest.close().then(resolvePipeToPromise, rejectPipeToPromise);
+ dest.close().then(resolveFinishedPromise, rejectFinishedPromise);
} else {
- resolvePipeToPromise();
+ resolveFinishedPromise();
}
}
function abortDest(reason) {
+ if (unpiped === true) {
+ return;
+ }
+
// Does not need to wait for lastRead since it only occurs on source errored.
reader.releaseLock();
@@ -147,7 +176,7 @@ export default class ReadableStream {
if (preventAbort === false) {
dest.abort(reason);
}
- rejectPipeToPromise(reason);
+ rejectFinishedPromise(reason);
}
}
diff --git a/reference-implementation/test/brand-checks.js b/reference-implementation/test/brand-checks.js
index f94ab50c6..305995f9e 100644
--- a/reference-implementation/test/brand-checks.js
+++ b/reference-implementation/test/brand-checks.js
@@ -15,7 +15,9 @@ function fakeReadableStream() {
get closed() { return Promise.resolve(); },
cancel(reason) { return Promise.resolve(); },
pipeThrough({ writable, readable }, options) { return readable; },
- pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { return Promise.resolve(); },
+ pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) {
+ return { finished: Promise.resolve(), unpipe() { } };
+ },
getReader() { return new ReadableStream(new ReadableStream()); }
};
}
diff --git a/reference-implementation/test/pipe-through.js b/reference-implementation/test/pipe-through.js
index 079052e86..14597ae9b 100644
--- a/reference-implementation/test/pipe-through.js
+++ b/reference-implementation/test/pipe-through.js
@@ -33,7 +33,7 @@ test('Piping through an identity transform stream will close the destination whe
const ws = new WritableStream();
- rs.pipeThrough(ts).pipeTo(ws).then(() => {
+ rs.pipeThrough(ts).pipeTo(ws).finished.then(() => {
t.equal(ws.state, 'closed', 'the writable stream was closed');
})
.catch(e => t.error(e));
@@ -82,7 +82,7 @@ test.skip('Piping through a default transform stream causes backpressure to be e
});
setTimeout(() => {
- rs.pipeThrough(ts).pipeTo(ws).then(() => {
+ rs.pipeThrough(ts).pipeTo(ws).finished.then(() => {
t.deepEqual(
enqueueReturnValues,
[true, true, true, true, false, false, false, false],
diff --git a/reference-implementation/test/pipe-to-options.js b/reference-implementation/test/pipe-to-options.js
index dc50e7635..b08cc5747 100644
--- a/reference-implementation/test/pipe-to-options.js
+++ b/reference-implementation/test/pipe-to-options.js
@@ -80,11 +80,11 @@ test('Piping with { preventCancel: true } and a destination error', t => {
}
});
- rs.pipeTo(ws, { preventCancel: true }).catch(e => {
- t.equal(e, theError, 'rejection reason of pipeTo promise is the sink error');
+ rs.pipeTo(ws, { preventCancel: true }).finished.catch(e => {
+ t.equal(e, theError, 'pipeTo finished promise should reject with the sink error');
let reader;
- t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo completes');
+ t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo finishes');
// { value: 'c', done: false } gets consumed before we know that ws has errored, and so is lost.
diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js
index 5b7427e10..11a6840da 100644
--- a/reference-implementation/test/pipe-to.js
+++ b/reference-implementation/test/pipe-to.js
@@ -30,7 +30,7 @@ test('Piping from a ReadableStream from which lots of data are readable synchron
});
let pipeFinished = false;
- rs.pipeTo(ws).then(
+ rs.pipeTo(ws).finished.then(
() => {
pipeFinished = true;
t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes');
@@ -75,11 +75,11 @@ test('Piping from a ReadableStream in readable state to a WritableStream in clos
rsClosed = true;
});
- rs.pipeTo(ws).then(
- () => t.fail('promise returned by pipeTo should not fulfill'),
+ rs.pipeTo(ws).finished.then(
+ () => t.fail('pipeTo finished promise should not fulfill'),
r => {
t.equal(r, cancelReason,
- 'the pipeTo promise should reject with the same error as the underlying source cancel was called with');
+ 'pipeTo finished promise should reject with the same error as the underlying source cancel was called with');
t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes');
}
);
@@ -135,8 +135,8 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro
ws.ready.then(() => {
t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state');
- rs.pipeTo(ws).catch(e => {
- t.equal(e, passedError, 'pipeTo promise should be rejected with the error');
+ rs.pipeTo(ws).finished.catch(e => {
+ t.equal(e, passedError, 'pipeTo finished promise should be rejected with the error');
t.assert(cancelCalled, 'cancel should have been called');
t.end();
});
@@ -187,11 +187,11 @@ test('Piping from a ReadableStream in the readable state which becomes closed af
});
startPromise.then(() => {
- rs.pipeTo(ws).then(() => {
- t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo completes');
+ rs.pipeTo(ws).finished.then(() => {
+ t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo finishes');
});
- t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo');
+ t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call');
closeReadableStream();
});
@@ -235,12 +235,12 @@ test('Piping from a ReadableStream in the readable state which becomes errored a
});
startPromise.then(() => {
- rs.pipeTo(ws).catch(e => {
- t.equal(e, passedError, 'pipeTo should be rejected with the passed error');
- t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo completes');
+ rs.pipeTo(ws).finished.catch(e => {
+ t.equal(e, passedError, 'pipeTo finished promise should be rejected with the passed error');
+ t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo finishes');
});
- t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo');
+ t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call');
errorReadableStream(passedError);
});
@@ -276,7 +276,7 @@ test('Piping from an empty ReadableStream which becomes non-empty after pipeTo c
}
});
- rs.pipeTo(ws).then(() => t.fail('pipeTo promise should not fulfill'));
+ rs.pipeTo(ws).finished.then(() => t.fail('pipeTo finished promise should not fulfill'));
t.equal(ws.state, 'writable', 'writable stream should start in writable state');
enqueue('Hello');
@@ -312,7 +312,9 @@ test('Piping from an empty ReadableStream which becomes errored after pipeTo cal
}
});
- rs.pipeTo(ws).catch(e => t.equal(e, passedError, 'pipeTo should reject with the passed error'));
+ rs.pipeTo(ws).finished.catch(
+ e => t.equal(e, passedError, 'pipeTo finished promise should reject with the passed error')
+ );
t.equal(ws.state, 'writable', 'writable stream should start out writable');
errorReadableStream(passedError);
});
@@ -355,7 +357,9 @@ test('Piping from an empty ReadableStream to a WritableStream in the writable st
startPromise.then(() => {
t.equal(ws.state, 'writable', 'ws should start writable');
- rs.pipeTo(ws).catch(e => t.equal(e, theError, 'pipeTo should reject with the passed error'));
+ rs.pipeTo(ws).finished.catch(
+ e => t.equal(e, theError, 'pipeTo finished promise should reject with the passed error')
+ );
t.equal(ws.state, 'writable', 'ws should be writable after pipe');
errorWritableStream(theError);
@@ -788,9 +792,9 @@ test('Piping to a stream that has been aborted passes through the error as the c
const passedReason = new Error('I don\'t like you.');
ws.abort(passedReason);
- rs.pipeTo(ws).catch(e => {
- t.equal(e, passedReason, 'pipeTo rejection reason should be the cancellation reason');
- t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason');
+ rs.pipeTo(ws).finished.catch(e => {
+ t.equal(e, passedReason, 'pipeTo finished promise should reject with the cancellation reason');
+ t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason');
t.end();
});
});
@@ -809,9 +813,9 @@ test('Piping to a stream and then aborting it passes through the error as the ca
const pipeToPromise = rs.pipeTo(ws);
ws.abort(passedReason);
- pipeToPromise.catch(e => {
- t.equal(e, passedReason, 'pipeTo rejection reason should be the abortion reason');
- t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason');
+ pipeToPromise.finished.catch(e => {
+ t.equal(e, passedReason, 'pipeTo finished promise should reject with the abortion reason');
+ t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason');
t.end();
});
});
@@ -827,8 +831,8 @@ test('Piping to a stream that has been closed propagates a TypeError cancellatio
const ws = new WritableStream();
ws.close();
- rs.pipeTo(ws).catch(e => {
- t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError');
+ rs.pipeTo(ws).finished.catch(e => {
+ t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError');
t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError');
t.end();
});
@@ -844,11 +848,11 @@ test('Piping to a stream and then closing it propagates a TypeError cancellation
const ws = new WritableStream();
- const pipeToPromise = rs.pipeTo(ws);
+ const pipeToPromise = rs.pipeTo(ws).finished;
ws.close();
pipeToPromise.catch(e => {
- t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError');
+ t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError');
t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError');
t.end();
});
@@ -913,10 +917,10 @@ test('Piping to a stream that errors on write should not pass through the error
}
});
- rs.pipeTo(ws).then(
- () => t.fail('pipeTo should not fulfill'),
+ rs.pipeTo(ws).finished.then(
+ () => t.fail('pipeTo finished promise should not fulfill'),
r => {
- t.equal(r, passedError, 'pipeTo should reject with the same error as the write');
+ t.equal(r, passedError, 'pipeTo finished promise should reject with the same error as the write');
t.equal(cancelCalled, false, 'cancel should not have been called');
t.end();
}
@@ -986,7 +990,7 @@ test('Piping to a writable stream that does not consume the writes fast enough e
});
startPromise.then(() => {
- rs.pipeTo(ws).then(() => {
+ rs.pipeTo(ws).finished.then(() => {
t.deepEqual(enqueueReturnValues, [true, true, true, false], 'backpressure was correctly exerted at the source');
t.deepEqual(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks were written');
t.end();
diff --git a/reference-implementation/test/templated/readable-stream-closed.js b/reference-implementation/test/templated/readable-stream-closed.js
index 134b30613..f4b15e20f 100644
--- a/reference-implementation/test/templated/readable-stream-closed.js
+++ b/reference-implementation/test/templated/readable-stream-closed.js
@@ -60,8 +60,8 @@ export default (label, factory) => {
startPromise.then(() => {
t.equal(ws.state, 'writable', 'writable stream should start in writable state');
- rs.pipeTo(ws).then(() => {
- t.pass('pipeTo promise should be fulfilled');
+ rs.pipeTo(ws).finished.then(() => {
+ t.pass('pipeTo finished promise should be fulfilled');
t.equal(ws.state, 'closed', 'writable stream should become closed');
});
});
diff --git a/reference-implementation/test/templated/readable-stream-errored-async-only.js b/reference-implementation/test/templated/readable-stream-errored-async-only.js
index 8ec30721d..86907372c 100644
--- a/reference-implementation/test/templated/readable-stream-errored-async-only.js
+++ b/reference-implementation/test/templated/readable-stream-errored-async-only.js
@@ -15,9 +15,9 @@ export default (label, factory, error) => {
}
});
- rs.pipeTo(ws).catch(e => {
+ rs.pipeTo(ws).finished.catch(e => {
t.equal(ws.state, 'errored', 'destination should be errored');
- t.equal(e, error, 'rejection reason of pipeToPromise should be the source error');
+ t.equal(e, error, 'pipeTo finished promise should reject with the source error');
});
ws.closed.catch(e => t.equal(e, error), 'rejection reason of dest closed should be the source error');
@@ -33,9 +33,9 @@ export default (label, factory, error) => {
}
});
- rs.pipeTo(ws, { preventAbort: false }).catch(e => {
+ rs.pipeTo(ws, { preventAbort: false }).finished.catch(e => {
t.equal(ws.state, 'errored', 'destination should be errored');
- t.equal(e, error, 'rejection reason of pipeToPromise should be the source error');
+ t.equal(e, error, 'pipeTo finished promise should reject with the source error');
});
ws.closed.catch(e => t.equal(e, error), 'rejection reason of dest closed should be the source error');
@@ -51,9 +51,9 @@ export default (label, factory, error) => {
}
});
- rs.pipeTo(ws, { preventAbort: true }).catch(e => {
+ rs.pipeTo(ws, { preventAbort: true }).finished.catch(e => {
t.equal(ws.state, 'writable', 'destination should remain writable');
- t.equal(e, error, 'rejection reason of pipeToPromise should be the source error');
+ t.equal(e, error, 'pipeTo finished promise should reject with the source error');
});
});
};
diff --git a/reference-implementation/test/templated/readable-stream-errored.js b/reference-implementation/test/templated/readable-stream-errored.js
index 2866f9a3b..3aeb00382 100644
--- a/reference-implementation/test/templated/readable-stream-errored.js
+++ b/reference-implementation/test/templated/readable-stream-errored.js
@@ -39,16 +39,33 @@ export default (label, factory, error) => {
startPromise.then(() => {
t.equal(ws.state, 'writable');
- rs.pipeTo(ws).then(
- () => t.fail('pipeTo promise should not be fulfilled'),
+ rs.pipeTo(ws).finished.then(
+ () => t.fail('pipeTo finished promise should not be fulfilled'),
e => {
- t.equal(e, error, 'pipeTo promise should be rejected with the passed error');
+ t.equal(e, error, 'pipeTo finished promise should be rejected with the passed error');
t.equal(ws.state, 'errored', 'writable stream should become errored');
}
);
});
});
+ test('unpiping should be a no-op after the pipe fails', t => {
+ t.plan(2);
+
+ const rs = factory();
+ const ws = new WritableStream();
+ const pipe = rs.pipeTo(ws);
+
+ pipe.finished.catch(e => {
+ t.equal(e, error, 'pipeTo finished promise should be rejected with the passed error');
+
+ return pipe.unpipe().then(v => {
+ t.equal(v, undefined, 'unpipe() should fulfill with undefined');
+ });
+ })
+ .catch(e => t.error(e));
+ });
+
test('getReader() should return a reader that acts errored', t => {
t.plan(2);
const rs = factory();
diff --git a/reference-implementation/test/templated/readable-stream-two-chunks-closed.js b/reference-implementation/test/templated/readable-stream-two-chunks-closed.js
index 2793dfd13..907c8fa83 100644
--- a/reference-implementation/test/templated/readable-stream-two-chunks-closed.js
+++ b/reference-implementation/test/templated/readable-stream-two-chunks-closed.js
@@ -18,7 +18,7 @@ export default (label, factory, chunks) => {
}
});
- rs.pipeTo(ws).then(() => {
+ rs.pipeTo(ws).finished.then(() => {
t.equal(ws.state, 'closed', 'destination should be closed');
t.deepEqual(chunksWritten, chunks);
t.end();
@@ -38,7 +38,7 @@ export default (label, factory, chunks) => {
}
});
- rs.pipeTo(ws).then(() => {
+ rs.pipeTo(ws).finished.then(() => {
t.equal(ws.state, 'closed', 'destination should be closed');
t.deepEqual(chunksWritten, chunks);
t.end();
@@ -61,11 +61,52 @@ export default (label, factory, chunks) => {
}
});
- rs.pipeTo(ws, { preventClose: true }).then(() => {
+ rs.pipeTo(ws, { preventClose: true }).finished.then(() => {
t.equal(ws.state, 'writable', 'destination should be writable');
t.deepEqual(chunksWritten, chunks);
t.end();
});
});
+
+ test('piping and then immediately unpiping', t => {
+ t.plan(5);
+ const rs = factory();
+
+ const chunksWritten = [];
+ const ws = new WritableStream({
+ close() {
+ t.fail('unexpected close call');
+ },
+ abort() {
+ t.fail('unexpected abort call');
+ },
+ write(chunk) {
+ chunksWritten.push(chunk);
+ }
+ });
+
+ const pipe = rs.pipeTo(ws);
+
+ let unpipeFulfilled = false;
+
+ pipe.unpipe().then(() => {
+ unpipeFulfilled = true;
+
+ let reader;
+ t.doesNotThrow(() => { reader = rs.getReader(); },
+ 'should be able to get a reader after unpipe promise fulfills');
+
+ reader.read().then(r => {
+ t.deepEqual(r, { value: chunks[1], done: false }, 'reading from the reader should give the second chunk');
+ });
+ });
+
+ pipe.finished.then(v => {
+ t.equal(v, undefined, 'pipeTo finished promise should fulfill with undefined');
+ t.equal(unpipeFulfilled, false, 'pipeTo finished promise should fulfill before the unpipe promise');
+ });
+
+ t.throws(() => rs.getReader(), TypeError, 'should not be able to get a reader immediately after unpipe call');
+ });
};