Skip to content

Commit

Permalink
Update pipeTo to allow unpiping
Browse files Browse the repository at this point in the history
See discussion in #297. pipeTo now returns a { finished, unpipe() } object, instead of just a promise.
  • Loading branch information
domenic committed Mar 17, 2015
1 parent 17969ed commit 726b08d
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 64 deletions.
8 changes: 4 additions & 4 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ longer active. At this point another reader can be acquired at will.
the chain:

<pre><code class="lang-javascript">
readableStream.pipeTo(writableStream)
readableStream.pipeTo(writableStream).finished
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
Expand Down Expand Up @@ -924,7 +924,7 @@ a variable <var>stream</var>, that performs the following steps:
chance to slow down its data production.

<pre><code class="lang-javascript">
readableStream.pipeTo(writableStream)
readableStream.pipeTo(writableStream).finished
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
Expand Down Expand Up @@ -1839,7 +1839,7 @@ writable stream:
<pre><code class="lang-javascript">
const webSocketStream = makeReadableWebSocketStream("wss://example.com", 443);

webSocketStream.pipeTo(writableStream)
webSocketStream.pipeTo(writableStream).finished
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
Expand Down Expand Up @@ -1972,7 +1972,7 @@ We can then use this function to create writable streams for a web socket, and p
<pre><code class="lang-javascript">
const webSocketStream = makeWritableWebSocketStream("wss://example.com", 443);

readableStream.pipeTo(webSocketStream)
readableStream.pipeTo(webSocketStream).finished
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
Expand Down
49 changes: 39 additions & 10 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ export default class ReadableStream {
let reader;
let lastRead;
let closedPurposefully = false;
let resolvePipeToPromise;
let rejectPipeToPromise;
let unpiped = false;
let resolveFinishedPromise;
let rejectFinishedPromise;

return new Promise((resolve, reject) => {
resolvePipeToPromise = resolve;
rejectPipeToPromise = reject;
const finishedPromise = new Promise((resolve, reject) => {
resolveFinishedPromise = resolve;
rejectFinishedPromise = reject;

reader = source.getReader();

Expand All @@ -94,9 +95,25 @@ export default class ReadableStream {
doPipe();
});

return { finished: finishedPromise, unpipe };

function unpipe() {
unpiped = true;
return lastRead.then(finishUnpipe, finishUnpipe);

function finishUnpipe() {
reader.releaseLock();
resolveFinishedPromise(undefined);
}
}

function doPipe() {
lastRead = reader.read();
Promise.all([lastRead, dest.ready]).then(([{ value, done }]) => {
if (unpiped === true) {
return;
}

if (Boolean(done) === true) {
closeDest();
} else if (dest.state === 'writable') {
Expand All @@ -110,44 +127,56 @@ export default class ReadableStream {
}

function cancelSource(reason) {
if (unpiped === true) {
return;
}

if (preventCancel === false) {
reader.cancel(reason);
reader.releaseLock();
rejectPipeToPromise(reason);
rejectFinishedPromise(reason);
} else {
// If we don't cancel, we need to wait for lastRead to finish before we're allowed to release.
// We don't need to handle lastRead failing because that will trigger abortDest which takes care of
// both of these.
lastRead.then(() => {
reader.releaseLock();
rejectPipeToPromise(reason);
rejectFinishedPromise(reason);
});
}
}

function closeDest() {
if (unpiped === true) {
return;
}

// Does not need to wait for lastRead since it occurs only on source closed.

reader.releaseLock();

const destState = dest.state;
if (preventClose === false && (destState === 'waiting' || destState === 'writable')) {
closedPurposefully = true;
dest.close().then(resolvePipeToPromise, rejectPipeToPromise);
dest.close().then(resolveFinishedPromise, rejectFinishedPromise);
} else {
resolvePipeToPromise();
resolveFinishedPromise();
}
}

function abortDest(reason) {
if (unpiped === true) {
return;
}

// Does not need to wait for lastRead since it only occurs on source errored.

reader.releaseLock();

if (preventAbort === false) {
dest.abort(reason);
}
rejectPipeToPromise(reason);
rejectFinishedPromise(reason);
}
}

Expand Down
4 changes: 3 additions & 1 deletion reference-implementation/test/brand-checks.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ function fakeReadableStream() {
get closed() { return Promise.resolve(); },
cancel(reason) { return Promise.resolve(); },
pipeThrough({ writable, readable }, options) { return readable; },
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { return Promise.resolve(); },
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) {
return { finished: Promise.resolve(), unpipe() { } };
},
getReader() { return new ReadableStream(new ReadableStream()); }
};
}
Expand Down
4 changes: 2 additions & 2 deletions reference-implementation/test/pipe-through.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ test('Piping through an identity transform stream will close the destination whe

const ws = new WritableStream();

rs.pipeThrough(ts).pipeTo(ws).then(() => {
rs.pipeThrough(ts).pipeTo(ws).finished.then(() => {
t.equal(ws.state, 'closed', 'the writable stream was closed');
})
.catch(e => t.error(e));
Expand Down Expand Up @@ -82,7 +82,7 @@ test.skip('Piping through a default transform stream causes backpressure to be e
});

setTimeout(() => {
rs.pipeThrough(ts).pipeTo(ws).then(() => {
rs.pipeThrough(ts).pipeTo(ws).finished.then(() => {
t.deepEqual(
enqueueReturnValues,
[true, true, true, true, false, false, false, false],
Expand Down
6 changes: 3 additions & 3 deletions reference-implementation/test/pipe-to-options.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ test('Piping with { preventCancel: true } and a destination error', t => {
}
});

rs.pipeTo(ws, { preventCancel: true }).catch(e => {
t.equal(e, theError, 'rejection reason of pipeTo promise is the sink error');
rs.pipeTo(ws, { preventCancel: true }).finished.catch(e => {
t.equal(e, theError, 'pipeTo finished promise should reject with the sink error');

let reader;
t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo completes');
t.doesNotThrow(() => { reader = rs.getReader(); }, 'should be able to get a stream reader after pipeTo finishes');

// { value: 'c', done: false } gets consumed before we know that ws has errored, and so is lost.

Expand Down
64 changes: 34 additions & 30 deletions reference-implementation/test/pipe-to.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ test('Piping from a ReadableStream from which lots of data are readable synchron
});

let pipeFinished = false;
rs.pipeTo(ws).then(
rs.pipeTo(ws).finished.then(
() => {
pipeFinished = true;
t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes');
Expand Down Expand Up @@ -75,11 +75,11 @@ test('Piping from a ReadableStream in readable state to a WritableStream in clos
rsClosed = true;
});

rs.pipeTo(ws).then(
() => t.fail('promise returned by pipeTo should not fulfill'),
rs.pipeTo(ws).finished.then(
() => t.fail('pipeTo finished promise should not fulfill'),
r => {
t.equal(r, cancelReason,
'the pipeTo promise should reject with the same error as the underlying source cancel was called with');
'pipeTo finished promise should reject with the same error as the underlying source cancel was called with');
t.equal(rsClosed, true, 'readable stream should be closed after pipe finishes');
}
);
Expand Down Expand Up @@ -135,8 +135,8 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro
ws.ready.then(() => {
t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state');

rs.pipeTo(ws).catch(e => {
t.equal(e, passedError, 'pipeTo promise should be rejected with the error');
rs.pipeTo(ws).finished.catch(e => {
t.equal(e, passedError, 'pipeTo finished promise should be rejected with the error');
t.assert(cancelCalled, 'cancel should have been called');
t.end();
});
Expand Down Expand Up @@ -187,11 +187,11 @@ test('Piping from a ReadableStream in the readable state which becomes closed af
});

startPromise.then(() => {
rs.pipeTo(ws).then(() => {
t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo completes');
rs.pipeTo(ws).finished.then(() => {
t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo finishes');
});

t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo');
t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call');

closeReadableStream();
});
Expand Down Expand Up @@ -235,12 +235,12 @@ test('Piping from a ReadableStream in the readable state which becomes errored a
});

startPromise.then(() => {
rs.pipeTo(ws).catch(e => {
t.equal(e, passedError, 'pipeTo should be rejected with the passed error');
t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo completes');
rs.pipeTo(ws).finished.catch(e => {
t.equal(e, passedError, 'pipeTo finished promise should be rejected with the passed error');
t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo finishes');
});

t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo');
t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo call');

errorReadableStream(passedError);
});
Expand Down Expand Up @@ -276,7 +276,7 @@ test('Piping from an empty ReadableStream which becomes non-empty after pipeTo c
}
});

rs.pipeTo(ws).then(() => t.fail('pipeTo promise should not fulfill'));
rs.pipeTo(ws).finished.then(() => t.fail('pipeTo finished promise should not fulfill'));
t.equal(ws.state, 'writable', 'writable stream should start in writable state');

enqueue('Hello');
Expand Down Expand Up @@ -312,7 +312,9 @@ test('Piping from an empty ReadableStream which becomes errored after pipeTo cal
}
});

rs.pipeTo(ws).catch(e => t.equal(e, passedError, 'pipeTo should reject with the passed error'));
rs.pipeTo(ws).finished.catch(
e => t.equal(e, passedError, 'pipeTo finished promise should reject with the passed error')
);
t.equal(ws.state, 'writable', 'writable stream should start out writable');
errorReadableStream(passedError);
});
Expand Down Expand Up @@ -355,7 +357,9 @@ test('Piping from an empty ReadableStream to a WritableStream in the writable st
startPromise.then(() => {
t.equal(ws.state, 'writable', 'ws should start writable');

rs.pipeTo(ws).catch(e => t.equal(e, theError, 'pipeTo should reject with the passed error'));
rs.pipeTo(ws).finished.catch(
e => t.equal(e, theError, 'pipeTo finished promise should reject with the passed error')
);
t.equal(ws.state, 'writable', 'ws should be writable after pipe');

errorWritableStream(theError);
Expand Down Expand Up @@ -788,9 +792,9 @@ test('Piping to a stream that has been aborted passes through the error as the c
const passedReason = new Error('I don\'t like you.');
ws.abort(passedReason);

rs.pipeTo(ws).catch(e => {
t.equal(e, passedReason, 'pipeTo rejection reason should be the cancellation reason');
t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason');
rs.pipeTo(ws).finished.catch(e => {
t.equal(e, passedReason, 'pipeTo finished promise should reject with the cancellation reason');
t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason');
t.end();
});
});
Expand All @@ -809,9 +813,9 @@ test('Piping to a stream and then aborting it passes through the error as the ca
const pipeToPromise = rs.pipeTo(ws);
ws.abort(passedReason);

pipeToPromise.catch(e => {
t.equal(e, passedReason, 'pipeTo rejection reason should be the abortion reason');
t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason');
pipeToPromise.finished.catch(e => {
t.equal(e, passedReason, 'pipeTo finished promise should reject with the abortion reason');
t.equal(recordedReason, passedReason, 'the recorded cancellation reason should be the passed abort reason');
t.end();
});
});
Expand All @@ -827,8 +831,8 @@ test('Piping to a stream that has been closed propagates a TypeError cancellatio
const ws = new WritableStream();
ws.close();

rs.pipeTo(ws).catch(e => {
t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError');
rs.pipeTo(ws).finished.catch(e => {
t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError');
t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError');
t.end();
});
Expand All @@ -844,11 +848,11 @@ test('Piping to a stream and then closing it propagates a TypeError cancellation

const ws = new WritableStream();

const pipeToPromise = rs.pipeTo(ws);
const pipeToPromise = rs.pipeTo(ws).finished;
ws.close();

pipeToPromise.catch(e => {
t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError');
t.equal(e.constructor, TypeError, 'pipeTo finished promise should reject with a TypeError');
t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError');
t.end();
});
Expand Down Expand Up @@ -913,10 +917,10 @@ test('Piping to a stream that errors on write should not pass through the error
}
});

rs.pipeTo(ws).then(
() => t.fail('pipeTo should not fulfill'),
rs.pipeTo(ws).finished.then(
() => t.fail('pipeTo finished promise should not fulfill'),
r => {
t.equal(r, passedError, 'pipeTo should reject with the same error as the write');
t.equal(r, passedError, 'pipeTo finished promise should reject with the same error as the write');
t.equal(cancelCalled, false, 'cancel should not have been called');
t.end();
}
Expand Down Expand Up @@ -986,7 +990,7 @@ test('Piping to a writable stream that does not consume the writes fast enough e
});

startPromise.then(() => {
rs.pipeTo(ws).then(() => {
rs.pipeTo(ws).finished.then(() => {
t.deepEqual(enqueueReturnValues, [true, true, true, false], 'backpressure was correctly exerted at the source');
t.deepEqual(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks were written');
t.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ export default (label, factory) => {
startPromise.then(() => {
t.equal(ws.state, 'writable', 'writable stream should start in writable state');

rs.pipeTo(ws).then(() => {
t.pass('pipeTo promise should be fulfilled');
rs.pipeTo(ws).finished.then(() => {
t.pass('pipeTo finished promise should be fulfilled');
t.equal(ws.state, 'closed', 'writable stream should become closed');
});
});
Expand Down
Loading

0 comments on commit 726b08d

Please sign in to comment.