diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 46c4ad09b..d19ac701e 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -86,95 +86,272 @@ class ReadableStream { } pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { + // brandcheck + preventClose = Boolean(preventClose); preventAbort = Boolean(preventAbort); preventCancel = Boolean(preventCancel); const source = this; - let reader; - let lastRead; - let lastWrite; - let closedPurposefully = false; - let resolvePipeToPromise; - let rejectPipeToPromise; + let _resolvePipeToPromise; + let _rejectPipeToPromise; + + let _reader; + let _writer; + + let _state = 'piping'; + + let _lastRead; + let _lastWrite; + let _allWrites; return new Promise((resolve, reject) => { - resolvePipeToPromise = resolve; - rejectPipeToPromise = reject; + _resolvePipeToPromise = resolve; + _rejectPipeToPromise = reject; - reader = source.getReader(); + _reader = source.getReader(); + _writer = dest.getWriter(); - reader.closed.catch(abortDest); - dest.closed.then( - () => { - if (!closedPurposefully) { - cancelSource(new TypeError('destination is closing or closed and cannot be piped to anymore')); - } - }, - cancelSource + _reader.closed.catch(handleReaderClosedRejection); + _writer.closed.then( + handleWriterClosedFulfillment, + handleWriterClosedRejection ); doPipe(); }); + function releaseReader() { + console.log('pipeTo(): releaseReader()'); + + _reader.releaseLock(); + _reader = undefined; + } + + function releaseWriter() { + console.log('pipeTo(): releaseWriter()'); + + _writer.releaseLock(); + _writer = undefined; + } + + function done() { + console.log('pipeTo(): done()'); + + assert(_reader === undefined); + assert(_writer === undefined); + + _state = 'done'; + + _lastRead = undefined; + _lastWrite = undefined; + _allWrites = undefined; + } + + function finishWithFulfillment() { + console.log('pipeTo(): finishWithFulfillment()'); + + _resolvePipeToPromise(undefined); + _resolvePipeToPromise = undefined; + _rejectPipeToPromise = undefined; + + done(); + } + + function finishWithRejection(reason) { + console.log('pipeTo(): finishWithRejection()'); + + _rejectPipeToPromise(reason); + _resolvePipeToPromise = undefined; + _rejectPipeToPromise = undefined; + + done(); + } + + function abortWriterCancelReader(reason, skipAbort, skipCancel) { + const promises = []; + + if (skipAbort === false) { + _writer.abort(reason); + + releaseWriter(); + } else if (_lastWrite === undefined) { + releaseWriter(); + } else { + promises.push(_lastWrite.then( + () => { + releaseWriter(); + }, + () => { + releaseWriter(); + } + )); + } + + if (skipCancel === false) { + _reader.cancel(reason); + + releaseReader(); + } else if (_lastRead === undefined) { + releaseReader(); + } else { + promises.push(_lastRead.then( + () => { + releaseReader(); + }, + () => { + releaseReader(); + } + )); + } + + if (promises.length > 0) { + Promise.all(promises).then( + () => { + finishWithRejection(reason); + } + ); + _state = 'waitingForLastReadAndOrLastWrite'; + return; + } + + finishWithRejection(reason); + } + + function handleWriteRejection(reason) { + console.log('pipeTo(): handleWriteRejection()'); + + if (_state !== 'piping') { + return; + } + + abortWriterCancelReader(reason, preventAbort, preventCancel); + } + + function handleReadValue(value) { + console.log('pipeTo(): handleReadValue()'); + + _lastWrite = _writer.write(value); + _lastWrite.catch(handleWriteRejection); + + // dest may be already errored. But proceed to write(). + _allWrites = Promise.all([_allWrites, _lastWrite]); + + doPipe(); + } + + function handleReadDone() { + console.log('pipeTo(): handleReadDone()'); + + // Does not need to wait for lastRead since it occurs only on source closed. + + releaseReader(); + + if (preventClose === false) { + console.log('pipeTo(): Close dest'); + + // We don't use writer.closed. We can ensure that the microtask for writer.closed is run before any + // writer.close() call so that we can determine whether the closure was caused by the close() or ws was already + // closed before pipeTo(). It's possible but fragile. + _writer.close().then( + () => { + return _allWrites; + }, + reason => { + releaseWriter(); + finishWithRejection(reason); + } + ).then( + () => { + releaseWriter(); + finishWithFulfillment(); + } + ); + _state = 'closingDest'; + + return; + } + + if (_lastWrite === undefined) { + releaseWriter() + finishWithFulfillment(); + return; + } + + // We don't use writer.closed. pipeTo() is responsible only for what it has written. + _lastWrite.then( + () => { + releaseWriter(); + finishWithFulfillment(); + }, + reason => { + releaseWriter(); + finishWithRejection(reason) + } + ); + _state = 'waitingLastWriteOnReadableClosed'; + } + function doPipe() { - lastRead = reader.read(); - Promise.all([lastRead, dest.ready]).then(([{ value, done }]) => { - if (Boolean(done) === true) { - closeDest(); - } else if (dest.state === 'writable') { - lastWrite = dest.write(value); - doPipe(); + console.log('pipeTo(): doPipe()'); + + _lastRead = _reader.read(); + + Promise.all([_lastRead, _writer.ready]).then( + ([{ value, done }]) => { + if (_state !== 'piping') { + return; + } + + if (Boolean(done) === false) { + handleReadValue(value); + } else { + handleReadDone(); + } + }, + () => { + // Do nothing } - }) + ) .catch(rethrowAssertionErrorRejection); // Any failures will be handled by listening to reader.closed and dest.closed above. // TODO: handle malicious dest.write/dest.close? } - function cancelSource(reason) { - if (preventCancel === false) { - reader.cancel(reason); - reader.releaseLock(); - rejectPipeToPromise(reason); - } else { - // If we don't cancel, we need to wait for lastRead to finish before we're allowed to release. - // We don't need to handle lastRead failing because that will trigger abortDest which takes care of - // both of these. - lastRead.then(() => { - reader.releaseLock(); - rejectPipeToPromise(reason); - }); + function handleReaderClosedRejection(reason) { + console.log('pipeTo(): handleReaderClosedRejection()'); + + if (_state !== 'piping') { + return; } - } - function closeDest() { - // Does not need to wait for lastRead since it occurs only on source closed. + _lastRead = undefined; + abortWriterCancelReader(reason, preventAbort, true); + } - reader.releaseLock(); + function handleUnexpectedWriterCloseAndError(reason) { + console.log('pipeTo(): handleUnexpectedWriterCloseAndError()'); - const destState = dest.state; - if (preventClose === false && (destState === 'waiting' || destState === 'writable')) { - closedPurposefully = true; - dest.close().then(resolvePipeToPromise, rejectPipeToPromise); - } else if (lastWrite !== undefined) { - lastWrite.then(resolvePipeToPromise, rejectPipeToPromise); - } else { - resolvePipeToPromise(); + if (_state !== 'piping') { + return; } + + _lastWrite = undefined; + abortWriterCancelReader(reason, true, preventCancel); } - function abortDest(reason) { - // Does not need to wait for lastRead since it only occurs on source errored. + function handleWriterClosedFulfillment() { + console.log('pipeTo(): handleWriterClosedFulfillment()'); - reader.releaseLock(); + handleUnexpectedWriterCloseAndError(new TypeError('dest closed unexpectedly')); + } - if (preventAbort === false) { - dest.abort(reason); - } - rejectPipeToPromise(reason); + function handleWriterClosedRejection(reason) { + console.log('pipeTo(): handleWriterClosedRejection()'); + + handleUnexpectedWriterCloseAndError(reason); } } diff --git a/reference-implementation/lib/transform-stream.js b/reference-implementation/lib/transform-stream.js index 82786c5e4..df487bd37 100644 --- a/reference-implementation/lib/transform-stream.js +++ b/reference-implementation/lib/transform-stream.js @@ -1,74 +1,337 @@ 'use strict'; +const assert = require('assert'); const { ReadableStream } = require('./readable-stream.js'); const { WritableStream } = require('./writable-stream.js'); +// Functions passed to the transformer.start(). + +function TransformStreamCloseReadable(transformStream) { + console.log('TransformStreamCloseReadable()'); + + if (transformStream._errored === true) { + throw new TypeError('TransformStream is already errored'); + } + + if (transformStream._readableClosed === true) { + throw new TypeError('Readable side is already closed'); + } + + try { + transformStream._readableController.close(); + } catch (e) { + assert(false); + } + + transformStream._readableClosed = true; +} + +function TransformStreamEnqueueToReadable(transformStream, chunk) { + if (transformStream._errroed === true) { + throw new TypeError('TransformStream is already errored'); + } + + if (transformStream._readableClosed === true) { + throw new TypeError('Readable side is already closed'); + } + + // We throttle transformer.transform invoation based on the backpressure of the ReadableStream, but we still + // accept TrasnformStreamEnqueueToReadable() calls. + + const controller = transformStream._readableController; + + transformStream._readableBackpressure = true; + + try { + controller.enqueue(chunk); + } catch (e) { + if (transformStream._error === false) { + // This happens when the given strategy is bad. + const reason = new TypeError('Failed to enqueue to readable side'); + TransformStreamErrorInternal(transformStream, reason); + } + throw reason; + } + + let backpressure; + try { + backpressure = controller.desiredSize <= 0; + } catch (e) { + if (transformStream._error === false) { + const reason = new TypeError('Failed to calculate backpressure of readable side'); + TransformStreamError(transformStream, reason); + } + throw reason; + } + + // enqueue() may invoke pull() synchronously when we're not in pull() call. + // In such case, _readableBackpressure may be already set to false. + if (backpressure) { + transformStream._readableBackpressure = false; + } +} + +function TransformStreamError(transformStream, e) { + if (transformStream._errored === true) { + throw new TypeError('TransformStream is already errored'); + } + + TransformStreamErrorInternal(transformStream, e); +} + +// Functions passed to transformer.transform(). + +function TransformStreamChunkDone(transformStream) { + console.log('sjsjs' + transformStream); + if (transformStream._errroed === true) { + throw new TypeError('TransformStream is already errored'); + } + + if (transformStream._transforming === false) { + throw new TypeError('No active transform is running'); + } + + assert(transformStream._resolveWrite !== undefined); + + transformStream._transforming = false; + + transformStream._resolveWrite(undefined); + transformStream._resolveWrite = undefined; + + TransformStreamTransformIfNeeded(transformStream); +} + +// Abstract operations. + +function TransformStreamErrorInternal(transformStream, e) { + console.log('TransformStreamErrorInternal()'); + + transformStream._errored = true; + + if (transformStream._writableDone === false) { + transformStream._writableController.error(e); + } + if (transformStream._readableClosed === false) { + transformStream._readableController.error(e); + } + + transformStream._chunk = undefined; + + if (transformStream._resolveWriter !== undefined) { + transformStream._resolveWriter(undefined); + } +} + +function TransformStreamTransformIfNeeded(transformStream) { + console.log('TransformStreamTransformIfNeeded()'); + + if (transformStream._chunkPending === false) { + return; + } + + assert(transformStream._resolveWrite !== undefined); + + if (transformStream._transforming === true) { + return; + } + + if (transformStream._readableBackpressure === true) { + return; + } + + transformStream._transforming = true; + + const chunk = transformStream._chunk; + transformStream._chunkPending = false; + transformStream._chunk = undefined; + + try { + if (transformStream._transformer.transform !== undefined) { + transformStream._transformer.transform( + chunk, + TransformStreamChunkDone.bind(undefined, transformStream), + transformStream._enqueueFunction, + transformStream._closeFunction, + transformStream._errorFunction); + } + } catch (e) { + if (transformStream._errored === false) { + TransformStreamErrorInternal(transformStream, e); + } + } +} + +function TransformStreamStart(transformStream) { + if (transformStream._transformer.start === undefined) { + return; + } + + // Thrown exception will be handled by the constructor of TransformStream. + transformStream._transformer.start( + transformStream._enqueueFunction, + transformStream._closeFunction, + transformStream._errorFunction); +} + +class TransformStreamSink { + constructor(transformStream) { + this._transformStream = transformStream; + } + + start(c) { + const transformStream = this._transformStream; + + transformStream._writableController = c; + + if (transformStream._readableController !== undefined) { + TransformStreamStart(transformStream); + } + } + + write(chunk) { + console.log('TransformStreamSink.write()'); + + const transformStream = this._transformStream; + + assert(transformStream._errored === false); + + assert(transformStream._chunkPending === false); + assert(transformStream._chunk === undefined); + + assert(transformStream._resolveWrite === undefined); + + transformStream._chunkPending = true; + transformStream._chunk = chunk; + + const promise = new Promise(resolve => { + transformStream._resolveWrite = resolve; + }); + + TransformStreamTransformIfNeeded(transformStream); + + return promise; + } + + abort(reason) { + const transformStream = this._transformStream; + transformStream._writableDone = true; + TransformStreamErrorInternal(transformStream, new TypeError('Writable side aborted')); + } + + close() { + console.log('TransformStreamSink.close()'); + + const transformStream = this._transformStream; + + assert(transformStream._chunkPending === false); + assert(transformStream._chunk === undefined); + + assert(transformStream._resolveWrite === undefined); + + assert(transformStream._transforming === false); + + // No control over the promise returned by writableStreamWriter.close(). Need it? + + transformStream._writableDone = true; + + if (transformStream._transformer.flush === undefined) { + TransformStreamCloseReadable(transformStream); + } else { + try { + transformStream._transformer.flush( + transformStream._enqueueFunction, + transformStream._closeFunction, + transformStream._errorFunction); + } catch (e) { + if (transformStream._errored === false) { + TransformStreamErrorInternal(transformStream, e); + throw e; + } + } + } + } +} + +class TransformStreamSource { + constructor(transformStream) { + this._transformStream = transformStream; + } + + start(c) { + const transformStream = this._transformStream; + + transformStream._readableController = c; + + if (transformStream._writableController !== undefined) { + TransformStreamStart(transformStream); + } + } + + pull() { + this._transformStream._readableBackpressure = false; + TransformStreamTransformIfNeeded(this._transformStream); + } + + cancel(reason) { + const transformStream = this._transformStream; + transformStream._readableClosed = true; + TransformStreamErrorInternal(transformStream, new TypeError('Readable side canceled')); + } +} + module.exports = class TransformStream { constructor(transformer) { - if (transformer.flush === undefined) { - transformer.flush = (enqueue, close) => close(); + if (transformer.start !== undefined && typeof transformer.start !== 'function') { + throw new TypeError('start must be a function or undefined'); } - if (typeof transformer.transform !== 'function') { throw new TypeError('transform must be a function'); } + if (transformer.flush !== undefined && typeof transformer.flush !== 'function') { + throw new TypeError('flush must be a function or undefined'); + } - let writeChunk, writeDone, errorWritable; - let transforming = false; - let chunkWrittenButNotYetTransformed = false; - this.writable = new WritableStream({ - start(error) { - errorWritable = error; - }, - write(chunk) { - writeChunk = chunk; - chunkWrittenButNotYetTransformed = true; - - const p = new Promise(resolve => writeDone = resolve); - maybeDoTransform(); - return p; - }, - close() { - try { - transformer.flush(enqueueInReadable, closeReadable); - } catch (e) { - errorWritable(e); - errorReadable(e); - } - } - }, transformer.writableStrategy); - - let enqueueInReadable, closeReadable, errorReadable; - this.readable = new ReadableStream({ - start(c) { - enqueueInReadable = c.enqueue.bind(c); - closeReadable = c.close.bind(c); - errorReadable = c.error.bind(c); - }, - pull() { - if (chunkWrittenButNotYetTransformed === true) { - maybeDoTransform(); - } - } - }, transformer.readableStrategy); - - function maybeDoTransform() { - if (transforming === false) { - transforming = true; - try { - transformer.transform(writeChunk, enqueueInReadable, transformDone); - writeChunk = undefined; - chunkWrittenButNotYetTransformed = false; - } catch (e) { - transforming = false; - errorWritable(e); - errorReadable(e); - } + this._transformer = transformer; + + this._transforming = false; + this._errored = false; + + this._writableController = undefined; + this._readableController = undefined; + + this._writableDone = false; + this._readableClosed = false; + + this._resolveWrite = undefined; + + this._chunkPending = false; + this._chunk = undefined; + + this._enqueueFunction = TransformStreamEnqueueToReadable.bind(undefined, this); + this._closeFunction = TransformStreamCloseReadable.bind(undefined, this); + this._errorFunction = TransformStreamError.bind(undefined, this); + + const sink = new TransformStreamSink(this); + + try { + this.writable = new WritableStream(sink, transformer.writableStrategy); + } catch (e) { + if (this._errored === false) { + TransformStreamError(this, e); + throw e; } + return; } - function transformDone() { - transforming = false; - writeDone(); + const source = new TransformStreamSource(this); + + try { + this.readable = new ReadableStream(source, transformer.readableStrategy); + } catch (e) { + this.writable = undefined; + if (this._errored === false) { + TransformStreamError(this, e); + throw e; + } } } }; diff --git a/reference-implementation/lib/writable-stream.js b/reference-implementation/lib/writable-stream.js index 4e4f2b863..f104a2af7 100644 --- a/reference-implementation/lib/writable-stream.js +++ b/reference-implementation/lib/writable-stream.js @@ -4,225 +4,661 @@ const { InvokeOrNoop, PromiseInvokeOrNoop, PromiseInvokeOrFallbackOrNoop, Valida typeIsObject } = require('./helpers.js'); const { rethrowAssertionErrorRejection } = require('./utils.js'); const { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize, PeekQueueValue } = require('./queue-with-sizes.js'); -const CountQueuingStrategy = require('./count-queuing-strategy.js'); class WritableStream { - constructor(underlyingSink = {}, { size, highWaterMark = 0 } = {}) { - this._underlyingSink = underlyingSink; + constructor(underlyingSink = {}, { size, highWaterMark } = {}) { + // Temporary value. Never used. To be overwritten by the initializer code of the controller. + this._state = 'writable'; + this._storedError = undefined; - this._closedPromise = new Promise((resolve, reject) => { - this._closedPromise_resolve = resolve; - this._closedPromise_reject = reject; - }); + this._writer = undefined; - this._readyPromise = Promise.resolve(undefined); - this._readyPromise_resolve = null; + this._writeRequests = []; - this._queue = []; - this._state = 'writable'; - this._started = false; - this._writing = false; + // Initialize to undefined first because the constructor of the controller checks this + // variable to validate the caller. + this._writableStreamController = undefined; - const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark); - this._strategySize = normalizedStrategy.size; - this._strategyHWM = normalizedStrategy.highWaterMark; + const type = underlyingSink.type; - SyncWritableStreamStateWithQueue(this); + if (type !== undefined) { + throw new RangeError('Invalid type is specified'); + } - const error = closure_WritableStreamErrorFunction(); - error._stream = this; + if (highWaterMark === undefined) { + highWaterMark = 1; + } - const startResult = InvokeOrNoop(underlyingSink, 'start', [error]); - this._startedPromise = Promise.resolve(startResult); - this._startedPromise.then(() => { - this._started = true; - this._startedPromise = undefined; - }); - this._startedPromise.catch(r => ErrorWritableStream(this, r)).catch(rethrowAssertionErrorRejection); + this._writableStreamController = new WritableStreamDefaultController(this, underlyingSink, size, highWaterMark); } - get closed() { - if (!IsWritableStream(this)) { - return Promise.reject(new TypeError('WritableStream.prototype.closed can only be used on a WritableStream')); + get locked() { + if (IsWritableStream(this) === false) { + throw CreateWritableStreamBrandCheckException('locked'); } - return this._closedPromise; + return IsWritableStreamLocked(this); } - get state() { - if (!IsWritableStream(this)) { - throw new TypeError('WritableStream.prototype.state can only be used on a WritableStream'); + abort(reason) { + if (IsWritableStream(this) === false) { + return Promise.reject(CreateWritableStreamBrandCheckException('abort')); + } + + if (IsWritableStreamLocked(this) === true) { + return Promise.reject(new TypeError('Cannot abort a stream that already has a reader')); } - return this._state; + WritableStreamAbort(this, reason); } - abort(reason) { - if (!IsWritableStream(this)) { - return Promise.reject(new TypeError('WritableStream.prototype.abort can only be used on a WritableStream')); + getWriter() { + if (IsWritableStream(this) === false) { + throw CreateWritableStreamBrandCheckException('getWriter'); } - if (this._state === 'closed') { - return Promise.resolve(undefined); - } - if (this._state === 'errored') { - return Promise.reject(this._storedError); + return AcquireWritableStreamDefaultWriter(this); + } +} + +exports.WritableStream = WritableStream; + +// Helper functions for the WritableStream. + +function CreateWritableStreamBrandCheckException(name) { + return new TypeError('WritableStream.prototype.' + name + ' can only be used on a WritableStream') +} + +// Abstract operations for the WritableStream. + +function AcquireWritableStreamDefaultWriter(stream) { + return new WritableStreamDefaultWriter(stream); +} + +function IsWritableStream(x) { + if (!typeIsObject(x)) { + return false; + } + + if (!Object.prototype.hasOwnProperty.call(x, '_writableStreamController')) { + return false; + } + + return true; +} + +function IsWritableStreamLocked(stream) { + assert(IsWritableStream(stream) === true, 'IsWritableStreamLocked should only be used on known writable streams'); + + if (stream._writer === undefined) { + return false; + } + + return true; +} + +function WritableStreamAbort(stream, reason) { + const state = stream._state; + if (state === 'closed') { + return Promise.resolve(undefined); + } + if (state === 'errored') { + return Promise.reject(stream._storedError); + } + + assert(state === 'writable' || state === 'waiting' || state === 'closing'); + + const writer = stream._writer; + + const error = new TypeError('Aborted'); + + for (const writeRequest of stream._writeRequests) { + writeRequest._reject(error); + } + + if (writer !== undefined) { + WritableStreamDefaultWriterClosedPromiseReject(writer, error); + + if (state === 'waiting') { + WritableStreamDefaultWriterReadyPromiseResolve(writer, undefined); } + } + + stream._state = 'errored'; + stream._storedError = error; - ErrorWritableStream(this, reason); - const sinkAbortPromise = PromiseInvokeOrFallbackOrNoop(this._underlyingSink, 'abort', [reason], 'close', []); - return sinkAbortPromise.then(() => undefined); + return WritableStreamDefaultControllerAbort(stream._writableStreamController, reason); +} + +// WritableStream API exposed for controllers. + +function WritableStreamAddWriteRequest(stream) { + const writer = stream._writer; + assert(IsWritableStreamDefaultWriter(writer) === true); + + const state = stream._state; + assert(state === 'writable' || state === 'waiting'); + + const promise = new Promise((resolve, reject) => { + const writeRequest = { + _resolve: resolve, + _reject: reject + }; + + stream._writeRequests.push(writeRequest); + }); + + return promise; +} + +function WritableStreamError(stream, e) { + const state = stream._state; + assert(state === 'writable' || state === 'waiting' || state === 'closing'); + + const writeRequests = stream._writeRequests; + while (writeRequests.length > 0) { + const writeRequest = writeRequests.shift(); + writeRequest._reject(e); } - close() { - if (!IsWritableStream(this)) { - return Promise.reject(new TypeError('WritableStream.prototype.close can only be used on a WritableStream')); + const writer = stream._writer; + + if (writer !== undefined) { + WritableStreamDefaultWriterClosedPromiseReject(writer, e); + + if (state === 'waiting') { + WritableStreamDefaultWriterReadyPromiseResolve(writer, undefined); } + } - if (this._state === 'closing') { - return Promise.reject(new TypeError('cannot close an already-closing stream')); + stream._state = 'errored'; + stream._storedError = e; +} + +function WritableStreamFinishClose(stream) { + assert(stream._state === 'closing'); + + stream._state = 'closed'; + + const writer = stream._writer; + + // writer cannot be released while close() is ongoing. So, we can assert that + // there's an active writer. + assert(writer !== undefined); + + WritableStreamDefaultWriterClosedPromiseResolve(writer); +} + +function WritableStreamFulfillWriteRequest(stream) { + assert(stream._writeRequests.length > 0); + + const writeRequest = stream._writeRequests.shift(); + writeRequest._resolve(undefined); +} + +function WritableStreamUpdateBackpressure(stream, backpressure) { + const state = stream._state; + const writer = stream._writer; + + if (state === 'writable') { + if (backpressure === false) { + return; } - if (this._state === 'closed') { - return Promise.reject(new TypeError('cannot close an already-closed stream')); + + stream._state = 'waiting'; + + if (writer !== undefined) { + WritableStreamDefaultWriterReadyPromiseReset(writer); } - if (this._state === 'errored') { - return Promise.reject(this._storedError); + + return; + } + + assert(state === 'waiting'); + + if (backpressure === true) { + return; + } + + stream._state = 'writable'; + + if (writer !== undefined) { + WritableStreamDefaultWriterReadyPromiseResolve(writer, undefined); + } +} + +class WritableStreamDefaultWriter { + constructor(stream) { + if (IsWritableStream(stream) === false) { + throw new TypeError('WritableStreamDefaultWriter can only be constructed with a WritableStream instance'); } - if (this._state === 'waiting') { - this._readyPromise_resolve(undefined); + if (IsWritableStreamLocked(stream) === true) { + throw new TypeError('This stream has already been locked for exclusive writing by another writer'); } - this._state = 'closing'; - EnqueueValueWithSize(this._queue, 'close', 0); - CallOrScheduleWritableStreamAdvanceQueue(this); + this._ownerWritableStream = stream; + stream._writer = this; + + const state = stream._state; + + if (state === 'writable' || state === 'waiting' || state === 'closing') { + WritableStreamDefaultWriterClosedPromiseInitialize(this); + } else { + if (state === 'closed') { + WritableStreamDefaultWriterClosedPromiseInitializeAsResolved(this, undefined); + } else { + assert(state === 'errored', 'state must be errored'); + + WritableStreamDefaultWriterClosedPromiseInitializeAsRejected(this, stream._storedError); + } + } + + if (state === 'waiting') { + WritableStreamDefaultWriterReadyPromiseInitialize(this); + } else { + WritableStreamDefaultWriterReadyPromiseInitializeAsResolved(this, undefined); + } + } + + get closed() { + if (IsWritableStreamDefaultWriter(this) === false) { + return Promise.reject(CreateWritableStreamDefaultWriterBrandCheckException('closed')); + } return this._closedPromise; } + get desiredSize() { + if (IsWritableStreamDefaultWriter(this) === false) { + throw CreateWritableStreamDefaultWriterBrandCheckException('desiredSize'); + } + + if (this._ownerWritableStream === undefined) { + throw CreateWritableStreamDefaultWriterLockException('desiredSize'); + } + + return WritableStreamDefaultWriterGetDesiredSize(this) + } + get ready() { - if (!IsWritableStream(this)) { - return Promise.reject(new TypeError('WritableStream.prototype.ready can only be used on a WritableStream')); + if (IsWritableStreamDefaultWriter(this) === false) { + return Promise.reject(CreateWritableStreamDefaultWriterBrandCheckException('ready')); } return this._readyPromise; } - write(chunk) { - if (!IsWritableStream(this)) { - return Promise.reject(new TypeError('WritableStream.prototype.write can only be used on a WritableStream')); + abort(reason) { + if (IsWritableStreamDefaultWriter(this) === false) { + return Promise.reject(CreateWritableStreamDefaultWriterBrandCheckException('abort')); + } + + if (this._ownerWritableStream === undefined) { + return Promise.reject(CreateWritableStreamDefaultWriterLockException('abort')); } - if (this._state === 'closing') { - return Promise.reject(new TypeError('cannot write while stream is closing')); + return WritableStreamDefaultWriterAbort(this, reason); + } + + close() { + if (IsWritableStreamDefaultWriter(this) === false) { + return Promise.reject(CreateWritableStreamDefaultWriterBrandCheckException('close')); } - if (this._state === 'closed') { - return Promise.reject(new TypeError('cannot write after stream is closed')); + + const stream = this._ownerWritableStream; + + if (stream === undefined) { + return Promise.reject(CreateWritableStreamDefaultWriterLockException('close')); } - if (this._state === 'errored') { - return Promise.reject(this._storedError); + + if (stream._state === 'closing') { + return Promise.reject(new TypeError('cannot close an already-closing stream')); } - assert(this._state === 'waiting' || this._state === 'writable'); + return WritableStreamDefaultWriterClose(this); + } - let chunkSize = 1; + releaseLock() { + if (IsWritableStreamDefaultWriter(this) === false) { + throw CreateWritableStreamDefaultWriterBrandCheckException('releaseLock'); + } - if (this._strategySize !== undefined) { - try { - chunkSize = this._strategySize(chunk); - } catch (chunkSizeE) { - ErrorWritableStream(this, chunkSizeE); - return Promise.reject(chunkSizeE); - } + const stream = this._ownerWritableStream; + + if (stream === undefined) { + return undefined; } - let resolver, rejecter; - const promise = new Promise((resolve, reject) => { - resolver = resolve; - rejecter = reject; - }); + assert(stream._writer !== undefined); - const writeRecord = { promise: promise, chunk: chunk, _resolve: resolver, _reject: rejecter }; - try { - EnqueueValueWithSize(this._queue, writeRecord, chunkSize); - } catch (enqueueResultE) { - ErrorWritableStream(this, enqueueResultE); - return Promise.reject(enqueueResultE); + const state = stream._state; + + const releasedException = new TypeError('Writer was released and can no longer be used to monitor the stream\'s closedness'); + + if (state === 'writable' || state === 'waiting' || state === 'closing') { + WritableStreamDefaultWriterClosedPromiseReject(this, releasedException); + } else { + WritableStreamDefaultWriterClosedPromiseResetToRejected(this, releasedException); } - SyncWritableStreamStateWithQueue(this); - CallOrScheduleWritableStreamAdvanceQueue(this); - return promise; + if (state === 'waiting') { + WritableStreamDefaultWriterReadyPromiseReject(this, releasedException); + } else { + WritableStreamDefaultWriterReadyPromiseResetToRejected(this, releasedException); + } + + stream._writer = undefined; + this._ownerWritableStream = undefined; + } + + write(chunk) { + if (IsWritableStreamDefaultWriter(this) === false) { + return Promise.reject(CreateWritableStreamDefaultWriterBrandCheckException('write')); + } + + if (this._ownerWritableStream === undefined) { + return Promise.reject(CreateWritableStreamDefaultWriterLockException('write to')); + } + + if (this._ownerWritableStream._state === 'closing') { + return Promise.reject(new TypeError('Cannot write to an already-closed stream')); + } + + return WritableStreamDefaultWriterWrite(this, chunk); } } -exports.WritableStream = WritableStream; +// Helper functions for the WritableStreamDefaultWriter. -function closure_WritableStreamErrorFunction() { - const f = e => ErrorWritableStream(f._stream, e); - return f; +function CreateWritableStreamDefaultWriterBrandCheckException(name) { + return new TypeError('WritableStreamDefaultWriter.prototype.' + name + ' can only be used on a WritableStreamDefaultWriter'); } +function CreateWritableStreamDefaultWriterLockException(name) { + return new TypeError('Cannot ' + name + ' a stream using a released writer'); +} -function CallOrScheduleWritableStreamAdvanceQueue(stream) { - if (stream._started === false) { - stream._startedPromise.then(() => { - WritableStreamAdvanceQueue(stream); - }) - .catch(rethrowAssertionErrorRejection); - return undefined; +function WritableStreamDefaultWriterClosedPromiseInitialize(writer) { + writer._closedPromise = new Promise((resolve, reject) => { + writer._closedPromise_resolve = resolve; + writer._closedPromise_reject = reject; + }); +} + +function WritableStreamDefaultWriterClosedPromiseInitializeAsRejected(writer, reason) { + writer._closedPromise = Promise.reject(reason); + writer._closedPromise_resolve = undefined; + writer._closedPromise_reject = undefined; +} + +function WritableStreamDefaultWriterClosedPromiseInitializeAsResolved(writer, value) { + writer._closedPromise = Promise.resolve(value); + writer._closedPromise_resolve = undefined; + writer._closedPromise_reject = undefined; +} + +function WritableStreamDefaultWriterClosedPromiseReject(writer, reason) { + assert(writer._closedPromise_resolve !== undefined); + assert(writer._closedPromise_reject !== undefined); + + writer._closedPromise_reject(reason); + writer._closedPromise_resolve = undefined; + writer._closedPromise_reject = undefined; +} + +function WritableStreamDefaultWriterClosedPromiseResetToRejected(writer, reason) { + assert(writer._closedPromise_resolve === undefined); + assert(writer._closedPromise_reject === undefined); + + writer._closedPromise = Promise.reject(reason); +} + +function WritableStreamDefaultWriterClosedPromiseResolve(writer) { + assert(writer._closedPromise_resolve !== undefined); + assert(writer._closedPromise_reject !== undefined); + + writer._closedPromise_resolve(undefined); + writer._closedPromise_resolve = undefined; + writer._closedPromise_reject = undefined; +} + +function WritableStreamDefaultWriterReadyPromiseInitialize(writer) { + writer._readyPromise = new Promise((resolve, reject) => { + writer._readyPromise_resolve = resolve; + writer._readyPromise_reject = reject; + }); +} + +function WritableStreamDefaultWriterReadyPromiseInitializeAsResolved(writer) { + writer._readyPromise = Promise.resolve(undefined); + writer._readyPromise_resolve = undefined; + writer._readyPromise_reject = undefined; +} + +function WritableStreamDefaultWriterReadyPromiseReject(writer, reason) { + assert(writer._readyPromise_resolve !== undefined); + assert(writer._readyPromise_reject !== undefined); + + writer._readyPromise_reject(reason); + writer._readyPromise_resolve = undefined; + writer._readyPromise_reject = undefined; +} + +function WritableStreamDefaultWriterReadyPromiseReset(writer) { + assert(writer._readyPromise_resolve === undefined); + assert(writer._readyPromise_reject === undefined); + + writer._readyPromise = new Promise((resolve, reject) => { + writer._readyPromise_resolve = resolve; + writer._readyPromise_reject = reject; + }); +} + +function WritableStreamDefaultWriterReadyPromiseResetToRejected(writer, reason) { + assert(writer._readyPromise_resolve === undefined); + assert(writer._readyPromise_reject === undefined); + + writer._readyPromise = Promise.reject(reason); +} + +function WritableStreamDefaultWriterReadyPromiseResolve(writer, value) { + assert(writer._readyPromise_resolve !== undefined); + assert(writer._readyPromise_reject !== undefined); + + writer._readyPromise_resolve(value); + writer._readyPromise_resolve = undefined; + writer._readyPromise_reject = undefined; +} + +// Abstract operations for the WritableStreamDefaultWriter. + +function IsWritableStreamDefaultWriter(x) { + if (!typeIsObject(x)) { + return false; } - if (stream._started === true) { - return WritableStreamAdvanceQueue(stream); + if (!Object.prototype.hasOwnProperty.call(x, '_ownerWritableStream')) { + return false; } + + return true; } -function CloseWritableStream(stream) { - assert(stream._state === 'closing', 'stream must be in closing state while calling CloseWritableStream'); +// A client of WritableStreamDefaultWriter may use these functions directly to bypass state check. - const sinkClosePromise = PromiseInvokeOrNoop(stream._underlyingSink, 'close'); - sinkClosePromise.then( - () => { - if (stream._state === 'errored') { - return; - } +function WritableStreamDefaultWriterAbort(writer, reason) { + const stream = writer._ownerWritableStream; - assert(stream._state === 'closing'); + assert(stream !== undefined); - stream._closedPromise_resolve(undefined); - stream._closedPromise_resolve = undefined; - stream._closedPromise_reject = undefined; - stream._state = 'closed'; - }, - r => ErrorWritableStream(stream, r) - ) - .catch(rethrowAssertionErrorRejection); + return WritableStreamAbort(stream, reason); +} + +function WritableStreamDefaultWriterClose(writer) { + const stream = writer._ownerWritableStream; + + assert(stream !== undefined); + + const state = stream._state; + if (state === 'closed' || state === 'errored') { + return Promise.reject(new TypeError(`The stream (in ${state} state) is not in the writable state and cannot be closed`)); + } + + assert(state === 'writable' || state === 'waiting'); + + const promise = WritableStreamAddWriteRequest(stream); + + if (state === 'waiting') { + WritableStreamDefaultWriterReadyPromiseResolve(writer, undefined); + } + + stream._state = 'closing'; + + WritableStreamDefaultControllerClose(stream._writableStreamController); + + return promise; +} + +function WritableStreamDefaultWriterGetDesiredSize(writer) { + const stream = writer._ownerWritableStream; + + if (stream._state === 'errored') { + return null; + } + + return WritableStreamDefaultControllerGetDesiredSize(stream._writableStreamController); } -function ErrorWritableStream(stream, e) { - if (stream._state === 'closed' || stream._state === 'errored') { - return undefined; +function WritableStreamDefaultWriterWrite(writer, chunk) { + const stream = writer._ownerWritableStream; + + assert(stream !== undefined); + + const state = stream._state; + if (state === 'closed' || state === 'errored') { + return Promise.reject(new TypeError(`The stream (in ${state} state) is not in the writable state and cannot be written to`)); + } + + assert(state === 'writable' || state === 'waiting'); + + const promise = WritableStreamAddWriteRequest(stream); + + WritableStreamDefaultControllerWrite(stream._writableStreamController, chunk); + + return promise; +} + +class WritableStreamDefaultController { + constructor(stream, underlyingSink, size, highWaterMark) { + if (IsWritableStream(stream) === false) { + throw new TypeError('WritableStreamDefaultController can only be constructed with a WritableStream instance'); + } + + if (stream._writableStreamController !== undefined) { + throw new TypeError('WritableStreamDefaultController instances can only be created by the WritableStream constructor'); + } + + this._controlledWritableStream = stream; + + this._underlyingSink = underlyingSink; + + this._queue = []; + this._started = false; + this._writing = false; + + const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark); + this._strategySize = normalizedStrategy.size; + this._strategyHWM = normalizedStrategy.highWaterMark; + + WritableStreamDefaultControllerUpdateBackpressure(this); + + const controller = this; + + const startResult = InvokeOrNoop(underlyingSink, 'start', [this]); + Promise.resolve(startResult).then( + () => { + controller._started = true; + WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + r => { + WritableStreamDefaultControllerErrorIfNeeded(controller, r); + } + ) + .catch(rethrowAssertionErrorRejection); } - while (stream._queue.length > 0) { - const writeRecord = DequeueValue(stream._queue); - if (writeRecord !== 'close') { - writeRecord._reject(e); + error(e) { + if (IsWritableStreamDefaultController(this) === false) { + throw new TypeError('WritableStreamDefaultController.prototype.error can only be used on a WritableStreamDefaultController'); + } + + const state = this._controlledWritableStream._state; + if (state === 'closed' || state === 'errored') { + throw new TypeError(`The stream is ${state} and so cannot be errored`); } + + WritableStreamDefaultControllerError(this, e); } +} - stream._storedError = e; +// Abstract operations implementing interface required by the WritableStream. + +function WritableStreamDefaultControllerAbort(controller, reason) { + controller._queue = []; + + const sinkAbortPromise = PromiseInvokeOrFallbackOrNoop(controller._underlyingSink, 'abort', [reason], 'close', []); + return sinkAbortPromise.then(() => undefined); +} + +function WritableStreamDefaultControllerClose(controller) { + EnqueueValueWithSize(controller._queue, 'close', 0); + WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +function WritableStreamDefaultControllerGetDesiredSize(controller) { + const queueSize = GetTotalQueueSize(controller._queue); + return controller._strategyHWM - queueSize; +} - if (stream._state === 'waiting') { - stream._readyPromise_resolve(undefined); +function WritableStreamDefaultControllerWrite(controller, chunk) { + const stream = controller._controlledWritableStream; + + assert(stream._state === 'writable' || stream._state === 'waiting'); + + let chunkSize = 1; + + if (controller._strategySize !== undefined) { + try { + chunkSize = controller._strategySize(chunk); + } catch (chunkSizeE) { + // TODO: Should we notify the sink of this error? + WritableStreamDefaultControllerErrorIfNeeded(controller, chunkSizeE); + return Promise.reject(chunkSizeE); + } } - stream._closedPromise_reject(e); - stream._closedPromise_resolve = undefined; - stream._closedPromise_reject = undefined; - stream._state = 'errored'; + + const writeRecord = { chunk: chunk }; + + try { + EnqueueValueWithSize(controller._queue, writeRecord, chunkSize); + } catch (enqueueE) { + WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE); + return Promise.reject(enqueueE); + } + + const state = stream._state; + if (state === 'writable' || state === 'waiting') { + WritableStreamDefaultControllerUpdateBackpressure(controller); + } + + WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + + return; } -function IsWritableStream(x) { +// Abstract operations for the WritableStreamDefaultController. + +function IsWritableStreamDefaultController(x) { if (!typeIsObject(x)) { return false; } @@ -234,65 +670,109 @@ function IsWritableStream(x) { return true; } -exports.IsWritableStream = IsWritableStream; +function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { + const state = controller._controlledWritableStream._state; + if (state === 'closed' || state === 'errored') { + return; + } -function SyncWritableStreamStateWithQueue(stream) { - if (stream._state === 'closing') { - return undefined; + if (controller._started === false) { + return; } - assert(stream._state === 'writable' || stream._state === 'waiting', - 'stream must be in a writable or waiting state while calling SyncWritableStreamStateWithQueue'); + if (controller._queue.length === 0) { + return; + } - const queueSize = GetTotalQueueSize(stream._queue); - const shouldApplyBackpressure = queueSize > stream._strategyHWM; + if (controller._writing === true) { + return; + } - if (shouldApplyBackpressure === true && stream._state === 'writable') { - stream._state = 'waiting'; - stream._readyPromise = new Promise((resolve, reject) => { - stream._readyPromise_resolve = resolve; - }); + const writeRecord = PeekQueueValue(controller._queue); + if (writeRecord === 'close') { + WritableStreamDefaultControllerProcessClose(controller); + } else { + WritableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk); } +} - if (shouldApplyBackpressure === false && stream._state === 'waiting') { - stream._state = 'writable'; - stream._readyPromise_resolve(undefined); +function WritableStreamDefaultControllerErrorIfNeeded(controller, e) { + const state = controller._controlledWritableStream._state; + if (state === 'writable' || state === 'waiting' || state === 'closing') { + WritableStreamDefaultControllerError(controller, e); } +} + +function WritableStreamDefaultControllerProcessClose(controller) { + const stream = controller._controlledWritableStream; + + assert(stream._state === 'closing', 'can\'t process final write record unless already closed'); + + DequeueValue(controller._queue); + assert(controller._queue.length === 0, 'queue must be empty once the final write record is dequeued'); + + const sinkClosePromise = PromiseInvokeOrNoop(controller._underlyingSink, 'close'); + sinkClosePromise.then( + () => { + if (stream._state !== 'closing') { + return; + } - return undefined; + WritableStreamFulfillWriteRequest(stream); + WritableStreamFinishClose(stream); + }, + r => { + WritableStreamDefaultControllerErrorIfNeeded(controller, r); + } + ) + .catch(rethrowAssertionErrorRejection); } -function WritableStreamAdvanceQueue(stream) { - if (stream._queue.length === 0 || stream._writing === true) { - return undefined; - } +function WritableStreamDefaultControllerProcessWrite(controller, chunk) { + controller._writing = true; - const writeRecord = PeekQueueValue(stream._queue); + const sinkWritePromise = PromiseInvokeOrNoop(controller._underlyingSink, 'write', [chunk]); + sinkWritePromise.then( + () => { + const stream = controller._controlledWritableStream; + const state = stream._state; + if (state === 'errored' || state === 'closed') { + return; + } - if (writeRecord === 'close') { - assert(stream._state === 'closing', 'can\'t process final write record unless already closing'); - DequeueValue(stream._queue); - assert(stream._queue.length === 0, 'queue must be empty once the final write record is dequeued'); - return CloseWritableStream(stream); - } else { - stream._writing = true; + controller._writing = false; - PromiseInvokeOrNoop(stream._underlyingSink, 'write', [writeRecord.chunk]).then( - () => { - if (stream._state === 'errored') { - return; - } + WritableStreamFulfillWriteRequest(stream); - stream._writing = false; + DequeueValue(controller._queue); + if (state !== 'closing') { + WritableStreamDefaultControllerUpdateBackpressure(controller); + } - writeRecord._resolve(undefined); + WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + r => { + WritableStreamDefaultControllerErrorIfNeeded(controller, r); + } + ) + .catch(rethrowAssertionErrorRejection); +} - DequeueValue(stream._queue); - SyncWritableStreamStateWithQueue(stream); - WritableStreamAdvanceQueue(stream); - }, - r => ErrorWritableStream(stream, r) - ) - .catch(rethrowAssertionErrorRejection); - } +function WritableStreamDefaultControllerUpdateBackpressure(controller) { + const desiredSize = WritableStreamDefaultControllerGetDesiredSize(controller); + const backpressure = desiredSize <= 0; + WritableStreamUpdateBackpressure(controller._controlledWritableStream, backpressure); +} + +// A client of WritableStreamDefaultController may use these functions directly to bypass state check. + +function WritableStreamDefaultControllerError(controller, e) { + const stream = controller._controlledWritableStream; + + const state = stream._state; + assert(state === 'writable' || state === 'waiting' || state === 'closing'); + + controller._queue = []; + + WritableStreamError(stream, e); } diff --git a/reference-implementation/test/bad-strategies.js b/reference-implementation/test/bad-strategies.js index 10aba0049..3a03ae209 100644 --- a/reference-implementation/test/bad-strategies.js +++ b/reference-implementation/test/bad-strategies.js @@ -30,12 +30,13 @@ test('Writable stream: throwing strategy.size method', t => { }); }, 'initial construction should not throw'); - ws.write('a').then( + const writer = ws.getWriter(); + writer.write('a').then( () => t.fail('write should not fulfill'), r => t.equal(r, theError, 'write should reject with the thrown error') ); - ws.closed.then( + writer.closed.then( () => t.fail('closed should not fulfill'), r => t.equal(r, theError, 'closed should reject with the thrown error') ); @@ -53,14 +54,15 @@ test('Writable stream: invalid strategy.size return value', t => { highWaterMark: 5 }); - ws.write('a').then( + const writer = ws.getWriter(); + writer.write('a').then( () => t.fail('write should not fulfill'), r => { t.equal(r.constructor, RangeError, `write should reject with a RangeError for ${size}`); theError = r; }); - ws.closed.catch(e => t.equal(e, theError, `closed should reject with the error for ${size}`)); + writer.closed.catch(e => t.equal(e, theError, `closed should reject with the error for ${size}`)); } }); diff --git a/reference-implementation/test/bad-underlying-sinks.js b/reference-implementation/test/bad-underlying-sinks.js index 039fae363..33a27c28f 100644 --- a/reference-implementation/test/bad-underlying-sinks.js +++ b/reference-implementation/test/bad-underlying-sinks.js @@ -37,12 +37,14 @@ test('Underlying sink: throwing write getter', t => { } }); - ws.write('a').then( + const writer = ws.getWriter(); + + writer.write('a').then( () => t.fail('write should not fulfill'), r => t.equal(r, theError, 'write should reject with the thrown error') ); - ws.closed.then( + writer.closed.then( () => t.fail('closed should not fulfill'), r => t.equal(r, theError, 'closed should reject with the thrown error') ); @@ -58,12 +60,14 @@ test('Underlying sink: throwing write method', t => { } }); - ws.write('a').then( + const writer = ws.getWriter(); + + writer.write('a').then( () => t.fail('write should not fulfill'), r => t.equal(r, theError, 'write should reject with the thrown error') ); - ws.closed.then( + writer.closed.then( () => t.fail('closed should not fulfill'), r => t.equal(r, theError, 'closed should reject with the thrown error') ); @@ -80,14 +84,16 @@ test('Underlying sink: throwing abort getter', t => { } }); - ws.abort(abortReason).then( + const writer = ws.getWriter(); + + writer.abort(abortReason).then( () => t.fail('abort should not fulfill'), r => t.equal(r, theError, 'abort should reject with the abort reason') ); - ws.closed.then( + writer.closed.then( () => t.fail('closed should not fulfill'), - r => t.equal(r, abortReason, 'closed should reject with the thrown error') + r => t.equal(r.constructor, TypeError, 'closed should reject with a TypeError') ); }); @@ -102,14 +108,16 @@ test('Underlying sink: throwing abort method', t => { } }); - ws.abort(abortReason).then( + const writer = ws.getWriter(); + + writer.abort(abortReason).then( () => t.fail('abort should not fulfill'), r => t.equal(r, theError, 'abort should reject with the abort reason') ); - ws.closed.then( + writer.closed.then( () => t.fail('closed should not fulfill'), - r => t.equal(r, abortReason, 'closed should reject with the thrown error') + r => t.equal(r.constructor, TypeError, 'closed should reject with a TypeError') ); }); @@ -123,7 +131,8 @@ test('Underlying sink: throwing close getter', t => { } }); - ws.close().then( + const writer = ws.getWriter(); + writer.close().then( () => t.fail('close should not fulfill'), r => t.equal(r, theError, 'close should reject with the thrown error') ); @@ -139,7 +148,8 @@ test('Underlying sink: throwing close method', t => { } }); - ws.close().then( + const writer = ws.getWriter(); + writer.close().then( () => t.fail('close should not fulfill'), r => t.equal(r, theError, 'close should reject with the thrown error') ); diff --git a/reference-implementation/test/brand-checks.js b/reference-implementation/test/brand-checks.js index caefd4bc6..7a03c64a7 100644 --- a/reference-implementation/test/brand-checks.js +++ b/reference-implementation/test/brand-checks.js @@ -1,23 +1,20 @@ 'use strict'; const test = require('tape-catch'); -function fakeWritableStream() { +function fakeWritableStreamDefaultWriter() { return { get closed() { return Promise.resolve(); }, + get desiredSize() { return 1; }, get ready() { return Promise.resolve(); }, - get state() { return 'closed' }, abort(reason) { return Promise.resolve(); }, close() { return Promise.resolve(); }, write(chunk) { return Promise.resolve(); } }; } -function realWritableStream() { - return new WritableStream(); -} - -function realReadableStream() { - return new ReadableStream(); +function realReadableStreamDefaultWriter() { + const rs = new ReadableStream(); + return rs.getReader(); } function fakeByteLengthQueuingStrategy() { @@ -76,39 +73,42 @@ function methodThrows(t, obj, methodName, target) { t.throws(() => method.call(target), /TypeError/, methodName + ' should throw a TypeError'); } +const ws = new WritableStream(); +const writer = ws.getWriter(); +const WritableStreamDefaultWriter = writer.constructor; -test('WritableStream.prototype.closed enforces a brand check', t => { +test('WritableStreamDefaultWriter.prototype.closed enforces a brand check', t => { t.plan(2); - getterRejects(t, WritableStream.prototype, 'closed', fakeWritableStream()); - getterRejects(t, WritableStream.prototype, 'closed', realReadableStream()); + getterRejects(t, WritableStreamDefaultWriter.prototype, 'closed', fakeWritableStreamDefaultWriter()); + getterRejects(t, WritableStreamDefaultWriter.prototype, 'closed', realReadableStreamDefaultWriter()); }); -test('WritableStream.prototype.ready enforces a brand check', t => { +test('WritableStreamDefaultWriter.prototype.desiredSize enforces a brand check', t => { t.plan(2); - getterRejects(t, WritableStream.prototype, 'ready', fakeWritableStream()); - getterRejects(t, WritableStream.prototype, 'ready', realReadableStream()); + getterThrows(t, WritableStreamDefaultWriter.prototype, 'desiredSize', fakeWritableStreamDefaultWriter()); + getterThrows(t, WritableStreamDefaultWriter.prototype, 'desiredSize', realReadableStreamDefaultWriter()); }); -test('WritableStream.prototype.state enforces a brand check', t => { +test('WritableStreamDefaultWriter.prototype.ready enforces a brand check', t => { t.plan(2); - getterThrows(t, WritableStream.prototype, 'state', fakeWritableStream()); - getterThrows(t, WritableStream.prototype, 'state', realReadableStream()); + getterRejects(t, WritableStreamDefaultWriter.prototype, 'ready', fakeWritableStreamDefaultWriter()); + getterRejects(t, WritableStreamDefaultWriter.prototype, 'ready', realReadableStreamDefaultWriter()); }); -test('WritableStream.prototype.abort enforces a brand check', t => { +test('WritableStreamDefaultWriter.prototype.abort enforces a brand check', t => { t.plan(2); - methodRejects(t, WritableStream.prototype, 'abort', fakeWritableStream()); - methodRejects(t, WritableStream.prototype, 'abort', realReadableStream()); + methodRejects(t, WritableStreamDefaultWriter.prototype, 'abort', fakeWritableStreamDefaultWriter()); + methodRejects(t, WritableStreamDefaultWriter.prototype, 'abort', realReadableStreamDefaultWriter()); }); -test('WritableStream.prototype.write enforces a brand check', t => { +test('WritableStreamDefaultWriter.prototype.write enforces a brand check', t => { t.plan(2); - methodRejects(t, WritableStream.prototype, 'write', fakeWritableStream()); - methodRejects(t, WritableStream.prototype, 'write', realReadableStream()); + methodRejects(t, WritableStreamDefaultWriter.prototype, 'write', fakeWritableStreamDefaultWriter()); + methodRejects(t, WritableStreamDefaultWriter.prototype, 'write', realReadableStreamDefaultWriter()); }); -test('WritableStream.prototype.close enforces a brand check', t => { +test('WritableStreamDefaultWriter.prototype.close enforces a brand check', t => { t.plan(2); - methodRejects(t, WritableStream.prototype, 'close', fakeWritableStream()); - methodRejects(t, WritableStream.prototype, 'close', realReadableStream()); + methodRejects(t, WritableStreamDefaultWriter.prototype, 'close', fakeWritableStreamDefaultWriter()); + methodRejects(t, WritableStreamDefaultWriter.prototype, 'close', realReadableStreamDefaultWriter()); }); diff --git a/reference-implementation/test/byte-length-queuing-strategy.js b/reference-implementation/test/byte-length-queuing-strategy.js index 89bda8b58..7755ca551 100644 --- a/reference-implementation/test/byte-length-queuing-strategy.js +++ b/reference-implementation/test/byte-length-queuing-strategy.js @@ -23,6 +23,7 @@ test('Closing a writable stream with in-flight writes below the high water mark new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 16 }) ); - ws.write({ byteLength: 1024 }); - ws.close(); + const writer = ws.getWriter(); + writer.write({ byteLength: 1024 }); + writer.close(); }); diff --git a/reference-implementation/test/count-queuing-strategy.js b/reference-implementation/test/count-queuing-strategy.js index c688a56a9..9d82bfad3 100644 --- a/reference-implementation/test/count-queuing-strategy.js +++ b/reference-implementation/test/count-queuing-strategy.js @@ -20,28 +20,30 @@ test('Correctly governs the value of a WritableStream\'s state property (HWM = 0 ); setTimeout(() => { - t.equal(ws.state, 'writable', 'After 0 writes, 0 of which finished, state should be \'writable\''); + const writer = ws.getWriter(); - const writePromiseA = ws.write('a'); - t.equal(ws.state, 'waiting', 'After 1 write, 0 of which finished, state should be \'waiting\''); + t.equal(writer.desiredSize, 0, 'desiredSize should be initially 0'); - const writePromiseB = ws.write('b'); - t.equal(ws.state, 'waiting', 'After 2 writes, 0 of which finished, state should be \'waiting\''); + const writePromiseA = writer.write('a'); + t.equal(writer.desiredSize, -1, 'desiredSize should be -1 after 1st write()'); + + const writePromiseB = writer.write('b'); + t.equal(writer.desiredSize, -2, 'desiredSize should be -2 after 2nd write()'); dones.a(); writePromiseA.then(() => { - t.equal(ws.state, 'waiting', 'After 2 writes, 1 of which finished, state should be \'waiting\''); + t.equal(writer.desiredSize, -1, 'desiredSize should be -1 after completing 1st write()'); dones.b(); return writePromiseB.then(() => { - t.equal(ws.state, 'writable', 'After 2 writes, 2 of which finished, state should be \'writable\''); + t.equal(writer.desiredSize, 0, 'desiredSize should be 0 after completing 2nd write()'); - const writePromiseC = ws.write('c'); - t.equal(ws.state, 'waiting', 'After 3 writes, 2 of which finished, state should be \'waiting\''); + const writePromiseC = writer.write('c'); + t.equal(writer.desiredSize, -1, 'desiredSize should be -1 after 3rd write()'); dones.c(); return writePromiseC.then(() => { - t.equal(ws.state, 'writable', 'After 3 writes, 3 of which finished, state should be \'writable\''); + t.equal(writer.desiredSize, 0, 'desiredSize should be 0 after completing 3rd write()'); t.end(); }); @@ -64,50 +66,52 @@ test('Correctly governs the value of a WritableStream\'s state property (HWM = 4 ); setTimeout(() => { - t.equal(ws.state, 'writable', 'After 0 writes, 0 of which finished, state should be \'writable\''); + const writer = ws.getWriter(); + + t.equal(writer.desiredSize, 4, 'desiredSize should be initially 4'); - const writePromiseA = ws.write('a'); - t.equal(ws.state, 'writable', 'After 1 write, 0 of which finished, state should be \'writable\''); + const writePromiseA = writer.write('a'); + t.equal(writer.desiredSize, 3, 'desiredSize should be 3 after 1st write()'); - const writePromiseB = ws.write('b'); - t.equal(ws.state, 'writable', 'After 2 writes, 0 of which finished, state should be \'writable\''); + const writePromiseB = writer.write('b'); + t.equal(writer.desiredSize, 2, 'desiredSize should be 2 after 2nd write()'); - const writePromiseC = ws.write('c'); - t.equal(ws.state, 'writable', 'After 3 writes, 0 of which finished, state should be \'writable\''); + const writePromiseC = writer.write('c'); + t.equal(writer.desiredSize, 1, 'desiredSize should be 1 after 3rd write()'); - const writePromiseD = ws.write('d'); - t.equal(ws.state, 'writable', 'After 4 writes, 0 of which finished, state should be \'writable\''); + const writePromiseD = writer.write('d'); + t.equal(writer.desiredSize, 0, 'desiredSize should be 0 after 4th write()'); - ws.write('e'); - t.equal(ws.state, 'waiting', 'After 5 writes, 0 of which finished, state should be \'waiting\''); + writer.write('e'); + t.equal(writer.desiredSize, -1, 'desiredSize should be -1 after 5th write()'); - ws.write('f'); - t.equal(ws.state, 'waiting', 'After 6 writes, 0 of which finished, state should be \'waiting\''); + writer.write('f'); + t.equal(writer.desiredSize, -2, 'desiredSize should be -2 after 6th write()'); - ws.write('g'); - t.equal(ws.state, 'waiting', 'After 7 writes, 0 of which finished, state should be \'waiting\''); + writer.write('g'); + t.equal(writer.desiredSize, -3, 'desiredSize should be -3 after 7th write()'); dones.a(); writePromiseA.then(() => { - t.equal(ws.state, 'waiting', 'After 7 writes, 1 of which finished, state should be \'waiting\''); + t.equal(writer.desiredSize, -2, 'desiredSize should be -2 after completing 1st write()'); dones.b(); return writePromiseB.then(() => { - t.equal(ws.state, 'waiting', 'After 7 writes, 2 of which finished, state should be \'waiting\''); + t.equal(writer.desiredSize, -1, 'desiredSize should be -1 after completing 2nd write()'); dones.c(); return writePromiseC.then(() => { - t.equal(ws.state, 'writable', 'After 7 writes, 3 of which finished, state should be \'writable\''); + t.equal(writer.desiredSize, 0, 'desiredSize should be 0 after completing 3rd write()'); - ws.write('h'); - t.equal(ws.state, 'waiting', 'After 8 writes, 3 of which finished, state should be \'waiting\''); + writer.write('h'); + t.equal(writer.desiredSize, -1, 'desiredSize should be -1 after 8th write()'); dones.d(); return writePromiseD.then(() => { - t.equal(ws.state, 'writable', 'After 8 writes, 4 of which finished, state should be \'writable\''); + t.equal(writer.desiredSize, 0, 'desiredSize should be 0 after completing 4th write()'); - ws.write('i'); - t.equal(ws.state, 'waiting', 'After 9 writes, 4 of which finished, state should be \'waiting\''); + writer.write('i'); + t.equal(writer.desiredSize, -1, 'desiredSize should be -1 after 9th write()'); t.end(); }); diff --git a/reference-implementation/test/pipe-through.js b/reference-implementation/test/pipe-through.js index d1c09e6f6..84cbec218 100644 --- a/reference-implementation/test/pipe-through.js +++ b/reference-implementation/test/pipe-through.js @@ -14,8 +14,6 @@ test('Piping through a duck-typed pass-through transform stream works', t => { }); test('Piping through an identity transform stream will close the destination when the source closes', t => { - t.plan(1); - const rs = new ReadableStream({ start(c) { c.enqueue('a'); @@ -25,8 +23,13 @@ test('Piping through an identity transform stream will close the destination whe } }); + let enqueue; + const ts = new TransformStream({ - transform(chunk, enqueue, done) { + start(e) { + enqueue = e; + }, + transform(chunk, done) { enqueue(chunk); done(); } @@ -35,7 +38,7 @@ test('Piping through an identity transform stream will close the destination whe const ws = new WritableStream(); rs.pipeThrough(ts).pipeTo(ws).then(() => { - t.equal(ws.state, 'closed', 'the writable stream was closed'); + t.end(); }) .catch(e => t.error(e)); }); @@ -62,8 +65,13 @@ test.skip('Piping through a default transform stream causes backpressure to be e } }); + let enqueue; + const ts = new TransformStream({ - transform(chunk, enqueue, done) { + start(e) { + enqueue = e; + }, + transform(chunk, done) { enqueue(chunk); done(); } diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index 2e6c22e25..6582421dd 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -5,8 +5,28 @@ const sequentialReadableStream = require('./utils/sequential-rs.js'); // TODO: many asserts in this file are unlabeled; we should label them. +function promise_fulfills(t, expectedValue, promise, msg) { + promise.then(value => { + t.equal(value, expectedValue, msg); + }, reason => { + t.fail(msg + ': Rejected unexpectedly with: ' + reason); + }); +} + +function promise_rejects(t, expectedReason, promise, msg) { + promise.then(value => { + t.fail(name + ': Fulfilled unexpectedly wuth: ' + value); + }, reason => { + if (typeof expectedReason === 'function') { + t.equal(reason.constructor, expectedReason, msg); + } else { + t.equal(reason, expectedReason, msg); + } + }); +} + test('Piping from a ReadableStream from which lots of data are readable synchronously', t => { - t.plan(4); + t.plan(3); const rs = new ReadableStream({ start(c) { @@ -19,8 +39,6 @@ test('Piping from a ReadableStream from which lots of data are readable synchron const ws = new WritableStream({}, new CountQueuingStrategy({ highWaterMark: 1000 })); - t.equal(ws.state, 'writable', 'writable stream state should start out writable'); - let pipeFinished = false; rs.pipeTo(ws).then( () => { @@ -28,7 +46,8 @@ test('Piping from a ReadableStream from which lots of data are readable synchron rs.getReader().closed.then(() => { t.pass('readable stream should be closed after pipe finishes'); }); - t.equal(ws.state, 'closed', 'writable stream state should be closed after pipe finishes'); + promise_fulfills(t, undefined, ws.getWriter().closed, + 'writable stream should be closed after pipe finishes'); } ) .catch(e => t.error(e)); @@ -39,7 +58,7 @@ test('Piping from a ReadableStream from which lots of data are readable synchron }); test('Piping from a ReadableStream in readable state to a WritableStream in closing state', t => { - t.plan(4); + t.plan(3); let cancelReason; const rs = new ReadableStream({ @@ -61,20 +80,22 @@ test('Piping from a ReadableStream in readable state to a WritableStream in clos } }); - ws.close(); - t.equal(ws.state, 'closing', 'writable stream should be closing immediately after closing it'); - - rs.pipeTo(ws).then( - () => t.fail('promise returned by pipeTo should not fulfill'), - r => { - t.equal(r, cancelReason, - 'the pipeTo promise should reject with the same error as the underlying source cancel was called with'); - rs.getReader().closed.then(() => { - t.pass('readable stream should be closed after pipe finishes'); - }); - } - ) - .catch(e => t.error(e)); + const writer = ws.getWriter(); + writer.close().then(() => { + writer.releaseLock(); + + rs.pipeTo(ws).then( + () => t.fail('promise returned by pipeTo should not fulfill'), + r => { + t.equal(r, cancelReason, + 'the pipeTo promise should reject with the same error as the underlying source cancel was called with'); + rs.getReader().closed.then(() => { + t.pass('readable stream should be closed after pipe finishes'); + }); + } + ) + .catch(e => t.error(e)); + }); }); test('Piping from a ReadableStream in readable state to a WritableStream in errored state', t => { @@ -121,24 +142,26 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro }); startPromise.then(() => { - ws.write('Hello'); - t.assert(writeCalled, 'write must be called'); + const writer = ws.getWriter(); - ws.ready.then(() => { - t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state'); + writer.write('Hello'); + t.assert(writeCalled, 'write must be called'); + writer.closed.catch(() => { + writer.releaseLock(); rs.pipeTo(ws).catch(e => { t.equal(e, passedError, 'pipeTo promise should be rejected with the error'); t.assert(cancelCalled, 'cancel should have been called'); t.end(); }); }); - }); + }) + .catch(e => t.error(e)); }); test('Piping from a ReadableStream in the readable state which becomes closed after pipeTo call to a WritableStream ' + 'in the writable state', t => { - t.plan(5); + t.plan(4); let closeReadableStream; let pullCount = 0; @@ -180,18 +203,18 @@ test('Piping from a ReadableStream in the readable state which becomes closed af startPromise.then(() => { rs.pipeTo(ws).then(() => { - t.equal(ws.state, 'closed', 'writable stream should be closed after pipeTo completes'); + const writer = ws.getWriter(); + promise_fulfills(t, undefined, writer.closed, 'writer.closed promise should fulfill'); }); - t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo'); - closeReadableStream(); - }); + }) + .catch(e => t.error(e)); }); test('Piping from a ReadableStream in the readable state which becomes errored after pipeTo call to a WritableStream ' + 'in the writable state', t => { - t.plan(5); + t.plan(4); let errorReadableStream; let pullCount = 0; @@ -215,7 +238,6 @@ test('Piping from a ReadableStream in the readable state which becomes errored a return startPromise; }, write(chunk) { - t.fail('Unexpected extra write call'); }, close() { t.fail('Unexpected close call'); @@ -229,18 +251,19 @@ test('Piping from a ReadableStream in the readable state which becomes errored a startPromise.then(() => { rs.pipeTo(ws).catch(e => { t.equal(e, passedError, 'pipeTo should be rejected with the passed error'); - t.equal(ws.state, 'errored', 'writable stream should be errored after pipeTo completes'); + const writer = ws.getWriter(); + promise_rejects(t, TypeError, writer.closed, 'writer.closed should reject'); }); - t.equal(ws.state, 'writable', 'writable stream should still be writable immediately after pipeTo'); - errorReadableStream(passedError); - }); + }) + .catch(e => t.error(e)); }); test('Piping from an empty ReadableStream which becomes non-empty after pipeTo call to a WritableStream in the ' + 'writable state', t => { - t.plan(3); + t.plan(2); + let controller; let pullCount = 0; const rs = new ReadableStream({ @@ -269,14 +292,13 @@ test('Piping from an empty ReadableStream which becomes non-empty after pipeTo c }); rs.pipeTo(ws).then(() => t.fail('pipeTo promise should not fulfill')); - t.equal(ws.state, 'writable', 'writable stream should start in writable state'); controller.enqueue('Hello'); }); test('Piping from an empty ReadableStream which becomes errored after pipeTo call to a WritableStream in the ' + 'writable state', t => { - t.plan(3); + t.plan(2); let errorReadableStream; const rs = new ReadableStream({ @@ -305,13 +327,13 @@ test('Piping from an empty ReadableStream which becomes errored after pipeTo cal }); rs.pipeTo(ws).catch(e => t.equal(e, passedError, 'pipeTo should reject with the passed error')); - t.equal(ws.state, 'writable', 'writable stream should start out writable'); + errorReadableStream(passedError); }); test('Piping from an empty ReadableStream to a WritableStream in the writable state which becomes errored after a ' + 'pipeTo call', t => { - t.plan(6); + t.plan(4); const theError = new Error('cancel with me!'); @@ -326,11 +348,11 @@ test('Piping from an empty ReadableStream to a WritableStream in the writable st } }); - let errorWritableStream; + let writableController; const startPromise = Promise.resolve(); const ws = new WritableStream({ - start(error) { - errorWritableStream = error; + start(c) { + writableController = c; return startPromise; }, write(chunk) { @@ -345,14 +367,16 @@ test('Piping from an empty ReadableStream to a WritableStream in the writable st }); startPromise.then(() => { - t.equal(ws.state, 'writable', 'ws should start writable'); + rs.pipeTo(ws).catch(e => { + t.equal(e, theError, 'pipeTo should reject with the passed error') - rs.pipeTo(ws).catch(e => t.equal(e, theError, 'pipeTo should reject with the passed error')); - t.equal(ws.state, 'writable', 'ws should be writable after pipe'); + promise_rejects(t, theError, ws.getWriter().closed, 'ws.closed should reject with theError'); + }) + .catch(e => t.error(e)); - errorWritableStream(theError); - t.equal(ws.state, 'errored', 'ws should be errored after erroring it'); - }); + writableController.error(theError); + }) + .catch(e => t.error(e)); }); test('Piping from a non-empty ReadableStream to a WritableStream in the waiting state which becomes writable after a ' + @@ -393,20 +417,17 @@ test('Piping from a non-empty ReadableStream to a WritableStream in the waiting t.fail('Unexpected abort call'); } }); - ws.write('Hello'); startPromise.then(() => { - t.equal(ws.state, 'waiting', 'writable stream state should start waiting'); + const writer = ws.getWriter(); + writer.write('Hello').catch(e => t.error(e, 'write() rejected')); + writer.releaseLock(); - rs.pipeTo(ws); - t.equal(ws.state, 'waiting', 'writable stream state should still be waiting immediately after piping'); + rs.pipeTo(ws).catch(e => t.error(e, 'rs.pipeTo() rejected')); resolveWritePromise(); - ws.ready.then(() => { - t.equal(ws.state, 'writable', 'writable stream should eventually become writable (when ready fulfills)'); - }) - .catch(e => t.error(e)); - }); + }) + .catch(e => t.error(e, 'startPromise.then() rejected')); }); test('Piping from a non-empty ReadableStream to a WritableStream in waiting state which becomes errored after a ' + @@ -428,11 +449,11 @@ test('Piping from a non-empty ReadableStream to a WritableStream in waiting stat } }); - let errorWritableStream; + let writableController; const startPromise = Promise.resolve(); const ws = new WritableStream({ - start(error) { - errorWritableStream = error; + start(c) { + writableController = c; return startPromise; }, write(chunk) { @@ -448,29 +469,28 @@ test('Piping from a non-empty ReadableStream to a WritableStream in waiting stat t.fail('Unexpected abort call'); } }); - ws.write('Hello'); - startPromise.then(() => { - t.equal(ws.state, 'waiting'); + const writer = ws.getWriter(); + writer.write('Hello'); + writer.releaseLock(); + startPromise.then(() => { rs.pipeTo(ws); - t.equal(ws.state, 'waiting'); - errorWritableStream(); - t.equal(ws.state, 'errored'); + writableController.error(); }); }); test('Piping from a non-empty ReadableStream which becomes errored after pipeTo call to a WritableStream in the ' + 'waiting state', t => { - t.plan(6); + t.plan(4); - let errorReadableStream; + let readableController; let pullCount = 0; const rs = new ReadableStream({ start(c) { c.enqueue('World'); - errorReadableStream = c.error.bind(c); + readableController = c; }, pull() { ++pullCount; @@ -502,26 +522,27 @@ test('Piping from a non-empty ReadableStream which becomes errored after pipeTo t.pass('underlying source abort was called'); } }); - ws.write('Hello'); + + const writer = ws.getWriter(); + writer.write('Hello'); + writer.releaseLock(); startPromise.then(() => { - t.equal(ws.state, 'waiting'); t.equal(pullCount, 0); rs.pipeTo(ws); - t.equal(ws.state, 'waiting'); - errorReadableStream(); + readableController.error(); }); }); test('Piping from a non-empty ReadableStream to a WritableStream in the waiting state where both become ready ' + 'after a pipeTo', t => { - let controller; + let readableController; let pullCount = 0; const rs = new ReadableStream({ start(c) { - controller = c; + readableController = c; }, pull() { ++pullCount; @@ -557,26 +578,31 @@ test('Piping from a non-empty ReadableStream to a WritableStream in the waiting t.fail('Unexpected abort call'); } }); - ws.write('Hello'); + + const writer = ws.getWriter(); + writer.write('Hello'); startPromise.then(() => { t.equal(writeCount, 1, 'exactly one write should have happened'); - t.equal(ws.state, 'waiting', 'writable stream should be waiting'); + t.equal(writer.desiredSize, 0, 'writer.desiredSize should be 0'); + writer.releaseLock(); t.equal(pullCount, 1, 'pull should have been called only once'); rs.pipeTo(ws); - controller.enqueue('Goodbye'); + readableController.enqueue('Goodbye'); // Check that nothing happens before calling resolveWritePromise(), and then call resolveWritePromise() // to check that pipeTo is woken up. t.equal(pullCount, 1, 'after the pipeTo and enqueue, pull still should have been called only once'); + resolveWritePromise(); - }); + }) + .catch(e => t.error(e, 'startPromise.then() rejected')); }); test('Piping from an empty ReadableStream to a WritableStream in the waiting state which becomes writable after a ' + - 'pipeTo call', t => { + 'pipeTo() call', t => { let pullCount = 0; const rs = new ReadableStream({ pull() { @@ -606,13 +632,14 @@ test('Piping from an empty ReadableStream to a WritableStream in the waiting sta t.fail('Unexpected abort call'); } }); - ws.write('Hello'); - startPromise.then(() => { - t.equal(ws.state, 'waiting'); + const writer = ws.getWriter(); + writer.write('Hello'); + writer.releaseLock(); + startPromise.then(() => { rs.pipeTo(ws); - t.equal(ws.state, 'waiting'); + t.equal(pullCount, 1); resolveWritePromise(); @@ -624,15 +651,15 @@ test('Piping from an empty ReadableStream to a WritableStream in the waiting sta }); }); -test('Piping from an empty ReadableStream which becomes closed after a pipeTo call to a WritableStream in the ' + +test('Piping from an empty ReadableStream which becomes closed after a pipeTo() call to a WritableStream in the ' + 'waiting state whose writes never complete', t => { - t.plan(4); + t.plan(2); - let closeReadableStream; + let readableController; let pullCount = 0; const rs = new ReadableStream({ start(c) { - closeReadableStream = c.close.bind(c); + readableController = c; }, pull() { ++pullCount; @@ -652,7 +679,7 @@ test('Piping from an empty ReadableStream which becomes closed after a pipeTo ca if (!writeCalled) { t.equal(chunk, 'Hello', 'the chunk should be written to the writable stream'); writeCalled = true; - closeReadableStream(); + readableController.close(); } else { t.fail('Unexpected extra write call'); } @@ -665,29 +692,30 @@ test('Piping from an empty ReadableStream which becomes closed after a pipeTo ca t.fail('Unexpected abort call'); } }); - ws.write('Hello'); - startPromise.then(() => { - t.equal(ws.state, 'waiting', 'the writable stream should be in the waiting state after starting'); + const writer = ws.getWriter(); + writer.write('Hello'); + writer.releaseLock(); - rs.pipeTo(ws); + startPromise.then(() => { + rs.pipeTo(ws).catch(e => t.error(e, 'rs.pipeTo() rejected')); setTimeout(() => { - t.equal(ws.state, 'waiting', 'the writable stream should still be waiting since the write never completed'); t.equal(pullCount, 1, 'pull should have been called only once'); }, 50); - }); + }) + .catch(e => t.error(e, 'startPromise.then() rejected')); }); test('Piping from an empty ReadableStream which becomes errored after a pipeTo call to a WritableStream in the ' + 'waiting state', t => { - t.plan(5); + t.plan(4); - let errorReadableStream; + let readableController; let pullCount = 0; const rs = new ReadableStream({ start(c) { - errorReadableStream = c.error.bind(c); + readableController = c; }, pull() { ++pullCount; @@ -706,7 +734,7 @@ test('Piping from an empty ReadableStream which becomes errored after a pipeTo c }, write(chunk) { if (!writeCalled) { - t.equal(chunk, 'Hello'); + t.equal(chunk, 'Hello', 'chunk'); writeCalled = true; } else { t.fail('Unexpected extra write call'); @@ -717,19 +745,20 @@ test('Piping from an empty ReadableStream which becomes errored after a pipeTo c t.fail('Unexpected close call'); }, abort(reason) { - t.equal(reason, passedError); - t.assert(writeCalled); - t.equal(pullCount, 1); + t.equal(reason, passedError, 'reason should be passedError'); + t.assert(writeCalled, 'writeCalled should be true'); + t.equal(pullCount, 1, 'pullCount should be 1'); } }); - ws.write('Hello'); - startPromise.then(() => { - t.equal(ws.state, 'waiting'); + const writer = ws.getWriter(); + writer.write('Hello'); + writer.releaseLock(); + startPromise.then(() => { rs.pipeTo(ws); - errorReadableStream(passedError); + readableController.error(passedError); }); }); @@ -741,8 +770,7 @@ test('Piping to a duck-typed asynchronous "writable stream" works', t => { const rs = sequentialReadableStream(5, { async: true }); const chunksWritten = []; - const dest = { - state: 'writable', + const writer = { write(chunk) { chunksWritten.push(chunk); return Promise.resolve(); @@ -760,85 +788,55 @@ test('Piping to a duck-typed asynchronous "writable stream" works', t => { closed: new Promise(() => { }) }; - rs.pipeTo(dest); + const ws = { + getWriter() { return writer; } + }; + + rs.pipeTo(ws); }); test('Piping to a stream that has been aborted passes through the error as the cancellation reason', t => { - let recordedReason; + let recordedCancelReason; const rs = new ReadableStream({ cancel(reason) { - recordedReason = reason; + recordedCancelReason = reason; } }); const ws = new WritableStream(); const passedReason = new Error('I don\'t like you.'); - ws.abort(passedReason); + const writer = ws.getWriter(); + writer.abort(passedReason); + writer.releaseLock(); rs.pipeTo(ws).catch(e => { - t.equal(e, passedReason, 'pipeTo rejection reason should be the cancellation reason'); - t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason'); + t.equal(e.constructor, TypeError, 'pipeTo rejection reason should be a TypeError'); + t.equal(recordedCancelReason.constructor, TypeError, 'the recorded cancellation reason must be a TypeError'); t.end(); - }); -}); - -test('Piping to a stream and then aborting it passes through the error as the cancellation reason', t => { - let recordedReason; - const rs = new ReadableStream({ - cancel(reason) { - recordedReason = reason; - } - }); - - const ws = new WritableStream(); - const passedReason = new Error('I don\'t like you.'); - - const pipeToPromise = rs.pipeTo(ws); - ws.abort(passedReason); - - pipeToPromise.catch(e => { - t.equal(e, passedReason, 'pipeTo rejection reason should be the abortion reason'); - t.equal(recordedReason, passedReason, 'the recorded cancellation reason must be the passed abort reason'); - t.end(); - }); + }) + .catch(e => t.error(e)); }); test('Piping to a stream that has been closed propagates a TypeError cancellation reason backward', t => { - let recordedReason; + let recordedCancelReason; const rs = new ReadableStream({ cancel(reason) { - recordedReason = reason; + recordedCancelReason = reason; } }); const ws = new WritableStream(); - ws.close(); - - rs.pipeTo(ws).catch(e => { - t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError'); - t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError'); - t.end(); - }); -}); - -test('Piping to a stream and then closing it propagates a TypeError cancellation reason backward', t => { - let recordedReason; - const rs = new ReadableStream({ - cancel(reason) { - recordedReason = reason; - } - }); - - const ws = new WritableStream(); - - const pipeToPromise = rs.pipeTo(ws); - ws.close(); - - pipeToPromise.catch(e => { - t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError'); - t.equal(recordedReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError'); - t.end(); - }); + const writer = ws.getWriter(); + writer.close().then(() => { + writer.releaseLock(); + rs.pipeTo(ws).catch(e => { + t.equal(e.constructor, TypeError, 'the rejection reason for the pipeTo promise should be a TypeError'); + t.equal(recordedCancelReason.constructor, TypeError, 'the recorded cancellation reason should be a TypeError'); + t.end(); + }) + .catch(e => t.error(e)); + }) + .catch(e => t.error(e)); }); test('Piping to a stream that errors on write should pass through the error as the cancellation reason', t => { @@ -961,10 +959,10 @@ test('Piping to a writable stream that does not consume the writes fast enough e const chunksGivenToWrite = []; const chunksFinishedWriting = []; - const startPromise = Promise.resolve(); + const writableStartPromise = Promise.resolve(); const ws = new WritableStream({ start() { - return startPromise; + return writableStartPromise; }, write(chunk) { chunksGivenToWrite.push(chunk); @@ -977,17 +975,14 @@ test('Piping to a writable stream that does not consume the writes fast enough e } }); - startPromise.then(() => { + writableStartPromise.then(() => { rs.pipeTo(ws).then(() => { t.deepEqual(desiredSizes, [1, 1, 0, -1], 'backpressure was correctly exerted at the source'); t.deepEqual(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks were written'); t.end(); }); - t.equal(ws.state, 'writable', 'at t = 0 ms, ws should be writable'); - setTimeout(() => { - t.equal(ws.state, 'waiting', 'at t = 125 ms, ws should be waiting'); t.deepEqual(chunksGivenToWrite, ['a'], 'at t = 125 ms, ws.write should have been called with one chunk'); t.deepEqual(chunksFinishedWriting, [], 'at t = 125 ms, no chunks should have finished writing'); @@ -998,7 +993,6 @@ test('Piping to a writable stream that does not consume the writes fast enough e }, 125); setTimeout(() => { - t.equal(ws.state, 'waiting', 'at t = 225 ms, ws should be waiting'); t.deepEqual(chunksGivenToWrite, ['a'], 'at t = 225 ms, ws.write should have been called with one chunk'); t.deepEqual(chunksFinishedWriting, [], 'at t = 225 ms, no chunks should have finished writing'); @@ -1009,7 +1003,6 @@ test('Piping to a writable stream that does not consume the writes fast enough e }, 225); setTimeout(() => { - t.equal(ws.state, 'waiting', 'at t = 325 ms, ws should be waiting'); t.deepEqual(chunksGivenToWrite, ['a'], 'at t = 325 ms, ws.write should have been called with one chunk'); t.deepEqual(chunksFinishedWriting, [], 'at t = 325 ms, no chunks should have finished writing'); @@ -1021,7 +1014,6 @@ test('Piping to a writable stream that does not consume the writes fast enough e }, 325); setTimeout(() => { - t.equal(ws.state, 'waiting', 'at t = 425 ms, ws should be waiting'); t.deepEqual(chunksGivenToWrite, ['a'], 'at t = 425 ms, ws.write should have been called with one chunk'); t.deepEqual(chunksFinishedWriting, [], 'at t = 425 ms, no chunks should have finished writing'); @@ -1032,7 +1024,6 @@ test('Piping to a writable stream that does not consume the writes fast enough e }, 425); setTimeout(() => { - t.equal(ws.state, 'waiting', 'at t = 475 ms, ws should be waiting'); t.deepEqual(chunksGivenToWrite, ['a', 'b'], 'at t = 475 ms, ws.write should have been called with two chunks'); t.deepEqual(chunksFinishedWriting, ['a'], 'at t = 475 ms, one chunk should have finished writing'); }, 475); diff --git a/reference-implementation/test/templated/readable-stream-closed.js b/reference-implementation/test/templated/readable-stream-closed.js index 50c978bbe..b04242db0 100644 --- a/reference-implementation/test/templated/readable-stream-closed.js +++ b/reference-implementation/test/templated/readable-stream-closed.js @@ -7,7 +7,8 @@ module.exports = (label, factory) => { } test('piping to a WritableStream in the writable state should close the writable stream', t => { - t.plan(4); + t.plan(2); + const rs = factory(); const startPromise = Promise.resolve(); @@ -27,18 +28,16 @@ module.exports = (label, factory) => { }); startPromise.then(() => { - t.equal(ws.state, 'writable', 'writable stream should start in writable state'); - return rs.pipeTo(ws).then(() => { t.pass('pipeTo promise should be fulfilled'); - t.equal(ws.state, 'closed', 'writable stream should become closed'); }); }) .catch(e => t.error(e)); }); test('piping to a WritableStream in the writable state with { preventClose: true } should do nothing', t => { - t.plan(3); + t.plan(1); + const rs = factory(); const startPromise = Promise.resolve(); @@ -58,11 +57,8 @@ module.exports = (label, factory) => { }); startPromise.then(() => { - t.equal(ws.state, 'writable', 'writable stream should start in writable state'); - return rs.pipeTo(ws, { preventClose: true }).then(() => { t.pass('pipeTo promise should be fulfilled'); - t.equal(ws.state, 'writable', 'writable stream should still be writable'); }); }) .catch(e => t.error(e)); diff --git a/reference-implementation/test/templated/readable-stream-errored-async-only.js b/reference-implementation/test/templated/readable-stream-errored-async-only.js index e55a3f56e..c7efbbd44 100644 --- a/reference-implementation/test/templated/readable-stream-errored-async-only.js +++ b/reference-implementation/test/templated/readable-stream-errored-async-only.js @@ -7,7 +7,8 @@ module.exports = (label, factory, error) => { } test('piping with no options', t => { - t.plan(4); + t.plan(3); + const rs = factory(); const ws = new WritableStream({ @@ -17,15 +18,15 @@ module.exports = (label, factory, error) => { }); rs.pipeTo(ws).catch(e => { - t.equal(ws.state, 'errored', 'destination should be errored'); t.equal(e, error, 'rejection reason of pipeToPromise should be the source error'); - }); - ws.closed.catch(e => t.equal(e, error), 'rejection reason of dest closed should be the source error'); + ws.getWriter().closed.catch(e => t.equal(e.constructor, TypeError), 'rejection reason of dest closed should be a TypeError'); + }); }); test('piping with { preventAbort: false }', t => { - t.plan(4); + t.plan(3); + const rs = factory(); const ws = new WritableStream({ @@ -35,15 +36,15 @@ module.exports = (label, factory, error) => { }); rs.pipeTo(ws, { preventAbort: false }).catch(e => { - t.equal(ws.state, 'errored', 'destination should be errored'); t.equal(e, error, 'rejection reason of pipeToPromise should be the source error'); - }); - ws.closed.catch(e => t.equal(e, error), 'rejection reason of dest closed should be the source error'); + ws.getWriter().closed.catch(e => t.equal(e.constructor, TypeError), 'rejection reason of dest closed should be a TypeError'); + }); }); test('piping with { preventAbort: true }', t => { - t.plan(2); + t.plan(1); + const rs = factory(); const ws = new WritableStream({ @@ -53,7 +54,6 @@ module.exports = (label, factory, error) => { }); rs.pipeTo(ws, { preventAbort: true }).catch(e => { - t.equal(ws.state, 'writable', 'destination should remain writable'); t.equal(e, error, 'rejection reason of pipeToPromise should be the source error'); }); }); diff --git a/reference-implementation/test/templated/readable-stream-errored.js b/reference-implementation/test/templated/readable-stream-errored.js index 483c76bf2..bebe7f926 100644 --- a/reference-implementation/test/templated/readable-stream-errored.js +++ b/reference-implementation/test/templated/readable-stream-errored.js @@ -7,7 +7,7 @@ module.exports = (label, factory, error) => { } test('piping to a WritableStream in the writable state should abort the writable stream', t => { - t.plan(4); + t.plan(2); const rs = factory(); @@ -23,18 +23,15 @@ module.exports = (label, factory, error) => { t.fail('Unexpected close call'); }, abort(reason) { - t.equal(reason, error); + t.equal(reason, error, 'abort() of the underlying source should be called with error'); } }); startPromise.then(() => { - t.equal(ws.state, 'writable'); - rs.pipeTo(ws).then( () => t.fail('pipeTo promise should not be fulfilled'), e => { t.equal(e, error, 'pipeTo promise should be rejected with the passed error'); - t.equal(ws.state, 'errored', 'writable stream should become errored'); } ); }); diff --git a/reference-implementation/test/templated/readable-stream-two-chunks-closed.js b/reference-implementation/test/templated/readable-stream-two-chunks-closed.js index 637d94455..4aa921aa6 100644 --- a/reference-implementation/test/templated/readable-stream-two-chunks-closed.js +++ b/reference-implementation/test/templated/readable-stream-two-chunks-closed.js @@ -1,12 +1,22 @@ 'use strict'; const tapeTest = require('tape-catch'); +function promise_fulfills(t, expectedValue, promise, msg) { + promise.then(value => { + t.equal(value, expectedValue, msg); + }, reason => { + t.fail(msg + ': Rejected unexpectedly with: ' + reason); + }); +} + module.exports = (label, factory, chunks) => { function test(description, testFn) { tapeTest(`${label}: ${description}`, testFn); } test('piping with no options and no destination errors', t => { + t.plan(2); + const rs = factory(); const chunksWritten = []; @@ -20,13 +30,16 @@ module.exports = (label, factory, chunks) => { }); rs.pipeTo(ws).then(() => { - t.equal(ws.state, 'closed', 'destination should be closed'); + const writer = ws.getWriter(); + promise_fulfills(t, undefined, writer.closed, 'destination should be closed'); t.deepEqual(chunksWritten, chunks); - t.end(); - }); + }) + .catch(e => t.error(e)); }); test('piping with { preventClose: false } and no destination errors', t => { + t.plan(2); + const rs = factory(); const chunksWritten = []; @@ -40,10 +53,11 @@ module.exports = (label, factory, chunks) => { }); rs.pipeTo(ws).then(() => { - t.equal(ws.state, 'closed', 'destination should be closed'); + const writer = ws.getWriter(); + promise_fulfills(t, undefined, writer.closed, 'destination should be closed'); t.deepEqual(chunksWritten, chunks); - t.end(); - }); + }) + .catch(e => t.error(e)); }); test('piping with { preventClose: true } and no destination errors', t => { @@ -63,10 +77,10 @@ module.exports = (label, factory, chunks) => { }); rs.pipeTo(ws, { preventClose: true }).then(() => { - t.equal(ws.state, 'writable', 'destination should be writable'); t.deepEqual(chunksWritten, chunks); t.end(); - }); + }) + .catch(e => t.error(e)); }); test('piping with { preventClose: false } and a destination with that errors synchronously', t => { diff --git a/reference-implementation/test/transform-stream-errors.js b/reference-implementation/test/transform-stream-errors.js index 2449db025..ba4f803f2 100644 --- a/reference-implementation/test/transform-stream-errors.js +++ b/reference-implementation/test/transform-stream-errors.js @@ -2,7 +2,7 @@ const test = require('tape-catch'); test('TransformStream errors thrown in transform put the writable and readable in an errored state', t => { - t.plan(5); + t.plan(3); const thrownError = new Error('bad things are happening!'); const ts = new TransformStream({ @@ -11,8 +11,6 @@ test('TransformStream errors thrown in transform put the writable and readable i } }); - t.equal(ts.writable.state, 'writable', 'writable starts in writable'); - const reader = ts.readable.getReader(); reader.read().then( @@ -25,21 +23,22 @@ test('TransformStream errors thrown in transform put the writable and readable i e => t.equal(e, thrownError, 'readable\'s closed should be rejected with the thrown error') ); - ts.writable.closed.then( + const writer = ts.writable.getWriter(); + + writer.closed.then( () => t.fail('writable\'s closed should not be fulfilled'), e => t.equal(e, thrownError, 'writable\'s closed should be rejected with the thrown error') ); - ts.writable.write('a'); - t.equal(ts.writable.state, 'waiting', 'writable becomes waiting immediately after throw'); + writer.write('a'); }); test('TransformStream errors thrown in flush put the writable and readable in an errored state', t => { - t.plan(6); + t.plan(3); const thrownError = new Error('bad things are happening!'); const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { done(); }, flush() { @@ -59,14 +58,13 @@ test('TransformStream errors thrown in flush put the writable and readable in an e => t.equal(e, thrownError, 'readable\'s closed should be rejected with the thrown error') ); - ts.writable.closed.then( + const writer = ts.writable.getWriter(); + + writer.closed.then( () => t.fail('writable\'s closed should not be fulfilled'), e => t.equal(e, thrownError, 'writable\'s closed should be rejected with the thrown error') ); - t.equal(ts.writable.state, 'writable', 'writable starts in writable'); - ts.writable.write('a'); - t.equal(ts.writable.state, 'waiting', 'writable becomes waiting after a write'); - ts.writable.close(); - t.equal(ts.writable.state, 'closing', 'writable becomes closing after the close call'); + writer.write('a'); + writer.close(); }); diff --git a/reference-implementation/test/transform-stream.js b/reference-implementation/test/transform-stream.js index e062c2932..baed044d7 100644 --- a/reference-implementation/test/transform-stream.js +++ b/reference-implementation/test/transform-stream.js @@ -29,72 +29,67 @@ test('TransformStream writable starts in the writable state', t => { t.plan(1); const ts = new TransformStream({ transform() { } }); - t.equal(ts.writable.state, 'writable', 'writable starts writable'); + const writer = ts.writable.getWriter(); + t.equal(writer.desiredSize, 1, 'writer.desiredSize should be 1'); }); test('Pass-through sync TransformStream: can read from readable what is put into writable', t => { t.plan(3); const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { enqueue(chunk); done(); } }); - ts.writable.write('a'); + const writer = ts.writable.getWriter(); + writer.write('a'); + t.equal(writer.desiredSize, 0, 'writer.desiredSize should be 0 after write()'); - t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); ts.readable.getReader().read().then(result => { t.deepEqual(result, { value: 'a', done: false }, 'result from reading the readable is the same as was written to writable'); - return ts.writable.ready.then(() => { - t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); + return writer.ready.then(() => { + t.equal(writer.desiredSize, 1, 'desiredSize should be 1 again'); }); }) .catch(e => t.error(e)); }); test('Uppercaser sync TransformStream: can read from readable transformed version of what is put into writable', t => { - t.plan(3); - const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { enqueue(chunk.toUpperCase()); done(); } }); - ts.writable.write('a'); - - t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); + const writer = ts.writable.getWriter(); + writer.write('a'); ts.readable.getReader().read().then(result => { t.deepEqual(result, { value: 'A', done: false }, 'result from reading the readable is the transformation of what was written to writable'); - - return ts.writable.ready.then(() => { - t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); - }); + t.end(); }) .catch(e => t.error(e)); }); test('Uppercaser-doubler sync TransformStream: can read both chunks put into the readable', t => { - t.plan(4); + t.plan(2); const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { enqueue(chunk.toUpperCase()); enqueue(chunk.toUpperCase()); done(); } }); - ts.writable.write('a'); - - t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); + const writer = ts.writable.getWriter(); + writer.write('a'); const reader = ts.readable.getReader(); @@ -103,47 +98,38 @@ test('Uppercaser-doubler sync TransformStream: can read both chunks put into the 'the first chunk read is the transformation of the single chunk written'); return reader.read().then(result2 => { - t.deepEqual(result2, { value: 'A', done: false }, - 'the second chunk read is also the transformation of the single chunk written'); - - return ts.writable.ready.then(() => { - t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); - }); + t.deepEqual(result2, { value: 'A', done: false }, + 'the second chunk read is also the transformation of the single chunk written'); }); }) .catch(e => t.error(e)); }); test('Uppercaser async TransformStream: can read from readable transformed version of what is put into writable', t => { - t.plan(3); + t.plan(1); const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { setTimeout(() => enqueue(chunk.toUpperCase()), 10); setTimeout(done, 50); } }); - ts.writable.write('a'); - - t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); + const writer = ts.writable.getWriter(); + writer.write('a'); ts.readable.getReader().read().then(result => { t.deepEqual(result, { value: 'A', done: false }, 'result from reading the readable is the transformation of what was written to writable'); - - return ts.writable.ready.then(() => { - t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); - }); }) .catch(e => t.error(e)); }); test('Uppercaser-doubler async TransformStream: can read both chunks put into the readable', t => { - t.plan(4); + t.plan(2); const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { setTimeout(() => enqueue(chunk.toUpperCase()), 10); setTimeout(() => enqueue(chunk.toUpperCase()), 50); setTimeout(done, 90); @@ -152,52 +138,46 @@ test('Uppercaser-doubler async TransformStream: can read both chunks put into th const reader = ts.readable.getReader(); - ts.writable.write('a'); + const writer = ts.writable.getWriter(); + writer.write('a'); - t.equal(ts.writable.state, 'waiting', 'writable is waiting after one write'); reader.read().then(result1 => { t.deepEqual(result1, { value: 'A', done: false }, 'the first chunk read is the transformation of the single chunk written'); return reader.read().then(result2 => { - t.deepEqual(result2, { value: 'A', done: false }, - 'the second chunk read is also the transformation of the single chunk written'); - - return ts.writable.ready.then(() => { - t.equal(ts.writable.state, 'writable', 'writable becomes writable again'); - }); + t.deepEqual(result2, { value: 'A', done: false }, + 'the second chunk read is also the transformation of the single chunk written'); }); }) .catch(e => t.error(e)); }); test('TransformStream: by default, closing the writable closes the readable (when there are no queued writes)', t => { - t.plan(3); - const ts = new TransformStream({ transform() { } }); - ts.writable.close(); - t.equal(ts.writable.state, 'closing', 'writable is closing'); + const writer = ts.writable.getWriter(); + writer.close(); - Promise.all([ts.writable.closed, ts.readable.getReader().closed]).then(() => { + Promise.all([writer.closed, ts.readable.getReader().closed]).then(() => { t.pass('both writable and readable closed promises fulfill'); - t.equal(ts.writable.state, 'closed', 'writable state becomes closed eventually'); + t.end(); }) .catch(e => t.error(e)); }); test('TransformStream: by default, closing the writable waits for transforms to finish before closing both', t => { - t.plan(4); + t.plan(2); const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { setTimeout(done, 50); } }); - ts.writable.write('a'); - ts.writable.close(); - t.equal(ts.writable.state, 'closing', 'writable is closing'); + const writer = ts.writable.getWriter(); + writer.write('a'); + writer.close(); let rsClosed = false; ts.readable.getReader().closed.then(() => { @@ -207,8 +187,8 @@ test('TransformStream: by default, closing the writable waits for transforms to setTimeout(() => { t.equal(rsClosed, false, 'readable is not closed after a tick'); - ts.writable.closed.then(() => { - t.equal(ts.writable.state, 'closed', 'writable becomes closed eventually'); + writer.closed.then(() => { + // TODO: Is this expectation correct? t.equal(rsClosed, true, 'readable is closed at that point'); }) .catch(e => t.error(e)); @@ -216,58 +196,50 @@ test('TransformStream: by default, closing the writable waits for transforms to }); test('TransformStream: by default, closing the writable closes the readable after sync enqueues and async done', t => { - t.plan(3); - const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { enqueue('x'); enqueue('y'); setTimeout(done, 50); } }); - ts.writable.write('a'); - ts.writable.close(); - t.equal(ts.writable.state, 'closing', 'writable is closing'); - - ts.writable.closed.then(() => { - t.equal(ts.writable.state, 'closed', 'writable becomes closed eventually'); + const writer = ts.writable.getWriter(); + writer.write('a'); + writer.close(); + writer.closed.then(() => { return readableStreamToArray(ts.readable).then(chunks => { t.deepEqual(chunks, ['x', 'y'], 'both enqueued chunks can be read from the readable'); + t.end(); }); }) .catch(e => t.error(e)); }); test('TransformStream: by default, closing the writable closes the readable after async enqueues and async done', t => { - t.plan(3); - const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { setTimeout(() => enqueue('x'), 10); setTimeout(() => enqueue('y'), 50); setTimeout(done, 90); } }); - ts.writable.write('a'); - ts.writable.close(); - t.equal(ts.writable.state, 'closing', 'writable is closing'); - - ts.writable.closed.then(() => { - t.equal(ts.writable.state, 'closed', 'writable becomes closed eventually'); + const writer = ts.writable.getWriter(); + writer.write('a'); + writer.close(); + writer.closed.then(() => { return readableStreamToArray(ts.readable).then(chunks => { t.deepEqual(chunks, ['x', 'y'], 'both enqueued chunks can be read from the readable'); + t.end(); }); }) .catch(e => t.error(e)); }); test('TransformStream flush is called immediately when the writable is closed, if no writes are queued', t => { - t.plan(1); - let flushCalled = false; const ts = new TransformStream({ transform() { }, @@ -276,17 +248,16 @@ test('TransformStream flush is called immediately when the writable is closed, i } }); - ts.writable.close().then(() => { + ts.writable.getWriter().close().then(() => { t.ok(flushCalled, 'closing the writable triggers the transform flush immediately'); + t.end(); }); }); test('TransformStream flush is called after all queued writes finish, once the writable is closed', t => { - t.plan(3); - let flushCalled = false; const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { setTimeout(done, 10); }, flush(enqueue) { @@ -294,8 +265,9 @@ test('TransformStream flush is called after all queued writes finish, once the w } }); - ts.writable.write('a'); - ts.writable.close(); + const writer = ts.writable.getWriter(); + writer.write('a'); + writer.close(); t.notOk(flushCalled, 'closing the writable does not immediately call flush if writes are not finished'); let rsClosed = false; @@ -306,14 +278,13 @@ test('TransformStream flush is called after all queued writes finish, once the w setTimeout(() => { t.ok(flushCalled, 'flush is eventually called'); t.equal(rsClosed, false, 'if flush does not call close, the readable does not become closed'); + t.end(); }, 50); }); test('TransformStream flush gets a chance to enqueue more into the readable', t => { - t.plan(2); - const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { done(); }, flush(enqueue) { @@ -324,13 +295,16 @@ test('TransformStream flush gets a chance to enqueue more into the readable', t const reader = ts.readable.getReader(); - ts.writable.write('a'); - ts.writable.close(); + const writer = ts.writable.getWriter(); + writer.write('a'); + writer.close(); + reader.read().then(result1 => { t.deepEqual(result1, { value: 'x', done: false }, 'the first chunk read is the first one enqueued in flush'); return reader.read().then(result2 => { t.deepEqual(result2, { value: 'y', done: false }, 'the second chunk read is the second one enqueued in flush'); + t.end(); }); }) .catch(e => t.error(e)); @@ -340,7 +314,7 @@ test('TransformStream flush gets a chance to enqueue more into the readable, and t.plan(3); const ts = new TransformStream({ - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { done(); }, flush(enqueue, close) { @@ -352,8 +326,10 @@ test('TransformStream flush gets a chance to enqueue more into the readable, and const reader = ts.readable.getReader(); - ts.writable.write('a'); - ts.writable.close(); + const writer = ts.writable.getWriter(); + writer.write('a'); + writer.close(); + reader.read().then(result1 => { t.deepEqual(result1, { value: 'x', done: false }, 'the first chunk read is the first one enqueued in flush'); @@ -371,10 +347,11 @@ test('TransformStream flush gets a chance to enqueue more into the readable, and test('Transform stream should call transformer methods as methods', t => { t.plan(1); + const ts = new TransformStream({ suffix: '-suffix', - transform(chunk, enqueue, done) { + transform(chunk, done, enqueue) { enqueue(chunk + this.suffix); done(); }, @@ -385,9 +362,11 @@ test('Transform stream should call transformer methods as methods', t => { } }); - ts.writable.write('a'); - ts.writable.close(); - ts.writable.closed.then(() => { + const writer = ts.writable.getWriter(); + writer.write('a'); + writer.close(); + + writer.closed.then(() => { return readableStreamToArray(ts.readable).then(chunks => { t.deepEqual(chunks, ['a-suffix', 'flushed-suffix'], 'both enqueued chunks have suffixes'); }); diff --git a/reference-implementation/test/writable-stream-abort.js b/reference-implementation/test/writable-stream-abort.js index 6808fd45b..0c47d977e 100644 --- a/reference-implementation/test/writable-stream-abort.js +++ b/reference-implementation/test/writable-stream-abort.js @@ -1,43 +1,84 @@ 'use strict'; const test = require('tape-catch'); +function promise_fulfills(t, expectedValue, promise, msg) { + promise.then(value => { + t.equal(value, expectedValue, msg); + }, reason => { + t.fail(msg + ': Rejected unexpectedly with: ' + reason); + }); +} + +test('abort() on a released writer rejects', t => { + const ws = new WritableStream({}); + + const writer = ws.getWriter(); + writer.releaseLock(); + + const abortPromise = writer.abort(); + abortPromise.then(() => { + t.fail('abortPromise fulfilled unexpectedly'); + t.end(); + }, + r => { + t.end(); + }); +}); + test('Aborting a WritableStream immediately prevents future writes', t => { - const chunks = []; const ws = new WritableStream({ - write(chunk) { - chunks.push(chunk); + write() { + t.fail('Unexpected write() call'); + t.end(); } }); setTimeout(() => { - ws.abort(); - ws.write(1); - ws.write(2); - t.deepEqual(chunks, [], 'no chunks are written'); - t.end(); + const writer = ws.getWriter(); + + writer.abort(); + writer.write(1); + writer.write(2); + + setTimeout(() => { + t.end(); + }, 100); }, 0); }); test('Aborting a WritableStream prevents further writes after any that are in progress', t => { - const chunks = []; + t.plan(2); + + let writeCount = 0; + const ws = new WritableStream({ write(chunk) { - chunks.push(chunk); + ++writeCount; + + if (writeCount > 1) { + t.fail('Only the single in-progress chunk gets written to the sink'); + t.end(); + return; + } + + t.equals(chunk, 1, 'chunk should be 1'); + return new Promise(resolve => setTimeout(resolve, 50)); } }); setTimeout(() => { - ws.write(1); - ws.write(2); - ws.write(3); - ws.abort(); - ws.write(4); - ws.write(5); + const writer = ws.getWriter(); + + writer.write(1); + writer.write(2); + writer.write(3); + writer.abort(); + writer.write(4); + writer.write(5); setTimeout(function () { - t.deepEqual(chunks, [1], 'only the single in-progress chunk gets written'); - t.end(); + t.pass('Passed 200 ms'); }, 200); }, 0); }); @@ -50,7 +91,9 @@ test('Fulfillment value of ws.abort() call must be undefined even if the underly } }); - const abortPromise = ws.abort('a'); + const writer = ws.getWriter(); + + const abortPromise = writer.abort('a'); abortPromise.then(value => { t.equal(value, undefined, 'fulfillment value must be undefined'); t.end(); @@ -68,7 +111,9 @@ test('WritableStream if sink\'s abort throws, the promise returned by ws.abort() } }); - const abortPromise = ws.abort(undefined); + const writer = ws.getWriter(); + + const abortPromise = writer.abort(undefined); abortPromise.then( () => { t.fail('abortPromise is fulfilled unexpectedly'); @@ -89,63 +134,65 @@ test('Aborting a WritableStream passes through the given reason', t => { } }); + const writer = ws.getWriter(); + const passedReason = new Error('Sorry, it just wasn\'t meant to be.'); - ws.abort(passedReason); + writer.abort(passedReason); t.equal(recordedReason, passedReason); t.end(); }); test('Aborting a WritableStream puts it in an errored state, with stored error equal to the abort reason', t => { - t.plan(5); + t.plan(4); let recordedReason; const ws = new WritableStream(); - const passedReason = new Error('Sorry, it just wasn\'t meant to be.'); - ws.abort(passedReason); + const writer = ws.getWriter(); - t.equal(ws.state, 'errored', 'state should be errored'); + const passedReason = new Error('Sorry, it just wasn\'t meant to be.'); + writer.abort(passedReason); - ws.write().then( + writer.write().then( () => t.fail('writing should not succeed'), - r => t.equal(r, passedReason, 'writing should reject with the given reason') + r => t.equal(r.constructor, TypeError, 'writing should reject with the given reason') ); - ws.close().then( + writer.close().then( () => t.fail('closing should not succeed'), - r => t.equal(r, passedReason, 'closing should reject with the given reason') + r => t.equal(r.constructor, TypeError, 'closing should reject with the given reason') ); - ws.abort().then( + writer.abort().then( () => t.fail('aborting a second time should not succeed'), - r => t.equal(r, passedReason, 'aborting a second time should reject with the given reason') + r => t.equal(r.constructor, TypeError, 'aborting a second time should reject with the given reason') ); - ws.closed.then( - () => t.fail('closed promise should not be fulfilled'), - r => t.equal(r, passedReason, 'closed promise should be rejected with the given reason') + writer.closed.then( + () => t.fail('closed fulfilled unexpectedly'), + r => t.equal(r.constructor, TypeError, 'closed should reject with a TypeError') ); }); test('Aborting a WritableStream causes any outstanding ready promises to be fulfilled immediately', t => { - t.plan(2); - let recordedReason; const ws = new WritableStream({ write(chunk) { return new Promise(() => { }); // forever-pending, so normally .ready would not fulfill. } }); - ws.write('a'); - t.equal(ws.state, 'waiting', 'state should be waiting'); - ws.ready.then(() => { - t.equal(ws.state, 'errored', 'state should now be errored'); + const writer = ws.getWriter(); + + writer.write('a'); + + writer.ready.then(() => { + t.end(); }); const passedReason = new Error('Sorry, it just wasn\'t meant to be.'); - ws.abort(passedReason); + writer.abort(passedReason); }); test('Aborting a WritableStream causes any outstanding write() promises to be rejected with the abort reason', t => { @@ -153,87 +200,89 @@ test('Aborting a WritableStream causes any outstanding write() promises to be re const ws = new WritableStream(); - ws.write('a').then( + const writer = ws.getWriter(); + + writer.write('a').then( () => t.fail('writing should not succeed'), - r => t.equal(r, passedReason, 'writing should reject with the given reason') + r => t.equal(r.constructor, TypeError, 'writing should reject with a TypeError') ); const passedReason = new Error('Sorry, it just wasn\'t meant to be.'); - ws.abort(passedReason); + writer.abort(passedReason); }); test('Closing but then immediately aborting a WritableStream causes the stream to error', t => { - t.plan(2); + t.plan(1); const ws = new WritableStream(); - ws.close(); + const writer = ws.getWriter(); - const passedReason = new Error('Sorry, it just wasn\'t meant to be.'); - ws.abort(passedReason); + writer.close(); - t.equal(ws.state, 'errored'); + const passedReason = new Error('Sorry, it just wasn\'t meant to be.'); + writer.abort(passedReason); - ws.closed.then( + writer.closed.then( () => t.fail('the stream should not close successfully'), - r => t.equal(r, passedReason, 'the stream should be errored with the given reason') + r => t.equal(r.constructor, TypeError, 'the stream should be errored with a TypeError') ); }); test('Closing a WritableStream and aborting it while it closes causes the stream to error', t => { - t.plan(3); - const ws = new WritableStream({ close() { return new Promise(() => { }); // forever-pending } }); - ws.close(); + const writer = ws.getWriter(); + + writer.close(); const passedReason = new Error('Sorry, it just wasn\'t meant to be.'); setTimeout(() => { - t.equal(ws.state, 'closing'); - - ws.abort(passedReason); - - t.equal(ws.state, 'errored'); + writer.abort(passedReason); }, 20); - ws.closed.then( + writer.closed.then( () => t.fail('the stream should not close successfully'), - r => t.equal(r, passedReason, 'the stream should be errored with the given reason') + r => { + t.equal(r.constructor, TypeError, 'the stream should be errored with a TypeError'); + t.end(); + } ); }); test('Aborting a WritableStream after it is closed is a no-op', t => { - t.plan(3); + t.plan(2); const ws = new WritableStream(); - ws.close(); + const writer = ws.getWriter(); - setTimeout(() => { - t.equal(ws.state, 'closed'); + writer.close(); - ws.abort().then( + setTimeout(() => { + writer.abort().then( v => t.equal(v, undefined, 'abort promise should fulfill with undefined'), t.error ); - t.equal(ws.state, 'closed', 'state stays closed'); + promise_fulfills(t, undefined, writer.closed, 'closed should still be fulfilled'); }, 0); }); test('WritableStream should call underlying sink\'s close if no abort is supplied', t => { - t.plan(1); - const ws = new WritableStream({ close() { t.equal(arguments.length, 0, 'close() was called (with no arguments)'); + t.end(); } }); - ws.abort(); + const writer = ws.getWriter(); + + writer.abort(); }); diff --git a/reference-implementation/test/writable-stream.js b/reference-implementation/test/writable-stream.js index 24d6ed8e8..117c9d1be 100644 --- a/reference-implementation/test/writable-stream.js +++ b/reference-implementation/test/writable-stream.js @@ -1,25 +1,131 @@ 'use strict'; const test = require('tape-catch'); -function writeArrayToStream(array, writableStream) { - array.forEach(chunk => writableStream.write(chunk)); - return writableStream.close(); +function promise_rejects(t, expectedReason, promise, name, msg) { + promise.then(value => { + t.fail(name + ' fulfilled unexpectedly'); + t.end(); + }, reason => { + t.equal(reason, expectedReason, msg); + }); } -test('error argument is given to start method', t => { - let error; +function writeArrayToStream(array, writableStreamWriter) { + array.forEach(chunk => writableStreamWriter.write(chunk)); + return writableStreamWriter.close(); +} + +test('desiredSize on a released writer', t => { + const ws = new WritableStream({}); + const writer = ws.getWriter(); + writer.releaseLock(); + + try { + writer.desiredSize; + } catch(e) { + t.equal(e.constructor, TypeError, 'desiredSize should throw a TypeError'); + t.end(); + return; + } + + t.fail('writer.desiredSize did not throw'); +}); + +test('ws.getWriter() on a closing WritableStream', t => { + const ws = new WritableStream({}); + + const writer = ws.getWriter(); + writer.close(); + writer.releaseLock(); + + ws.getWriter(); + + t.end(); +}); + +test('ws.getWriter() on a closed WritableStream', t => { + const ws = new WritableStream({}); + + const writer = ws.getWriter(); + writer.close().then(() => { + writer.releaseLock(); + + ws.getWriter(); + + t.end(); + }) + .catch(e => t.error(e)); +}); + +test('ws.getWriter() on an aborted WritableStream', t => { + const ws = new WritableStream({}); + + const writer = ws.getWriter(); + writer.abort(); + writer.releaseLock(); + + ws.getWriter(); + + t.end(); +}); + +test('ws.getWriter() on an errored WritableStream', t => { + const ws = new WritableStream({ + start(c) { + c.error(); + } + }); + + const writer = ws.getWriter(); + writer.closed.then( + v => t.error('writer.closed fulfilled unexpectedly with: ' + v), + () => { + writer.releaseLock(); + + ws.getWriter(); + + t.end(); + } + ); +}); + +test('Controller argument is given to start method', t => { + let controller; const ws = new WritableStream({ - start(error_) { - error = error_; + start(c) { + controller = c; } }); // Now error the stream after its construction. const passedError = new Error('horrible things'); - error(passedError); - t.equal(ws.state, 'errored'); - ws.closed.catch(r => { - t.equal(r, passedError); + controller.error(passedError); + + const writer = ws.getWriter(); + + t.equal(writer.desiredSize, null, 'desiredSize should be null'); + writer.closed.catch(r => { + t.equal(r, passedError, 'ws should be errored by passedError'); + t.end(); + }); +}); + +test('highWaterMark', t => { + let controller; + const ws = new WritableStream({ + start(c) { + controller = c; + }, + }, { + highWaterMark: 1000, + size() { return 1; } + }); + + const writer = ws.getWriter(); + + t.equal(writer.desiredSize, 1000, 'desiredSize should be 1000'); + writer.ready.then(v => { + t.equal(v, undefined, 'ready promise should fulfill with undefined'); t.end(); }); }); @@ -34,7 +140,7 @@ test('Underlying sink\'s write won\'t be called until start finishes', t => { }, write(chunk) { if (expectWriteCall) { - t.equal(chunk, 'a'); + t.equal(chunk, 'a', 'chunk should be the value passed to writer.write()'); t.end(); } else { t.fail('Unexpected write call'); @@ -47,8 +153,11 @@ test('Underlying sink\'s write won\'t be called until start finishes', t => { } }); - ws.write('a'); - t.equal(ws.state, 'waiting', `writable stream should be waiting, not ${ws.state}`); + const writer = ws.getWriter(); + + t.equal(writer.desiredSize, 1, 'desiredSize should be 1'); + writer.write('a'); + t.equal(writer.desiredSize, 0, 'desiredSize should be 0 after writer.write()'); // Wait and see that write won't be called. setTimeout(() => { @@ -63,8 +172,7 @@ test('Underlying sink\'s close won\'t be called until start finishes', t => { let resolveStartPromise; const ws = new WritableStream({ start() { - return new Promise( - (resolve, reject) => { resolveStartPromise = resolve; }); + return new Promise(resolve => { resolveStartPromise = resolve; }); }, write(chunk) { t.fail('Unexpected write call'); @@ -79,8 +187,11 @@ test('Underlying sink\'s close won\'t be called until start finishes', t => { } } }); - ws.close('a'); - t.equal(ws.state, 'closing'); + + const writer = ws.getWriter(); + + writer.close('a'); + t.equal(writer.desiredSize, 1, 'desiredSize should be 1'); // Wait and see that write won't be called. setTimeout(() => { @@ -97,7 +208,9 @@ test('Fulfillment value of ws.close() call must be undefined even if the underly } }); - const closePromise = ws.close('a'); + const writer = ws.getWriter(); + + const closePromise = writer.close('a'); closePromise.then(value => { t.equal(value, undefined, 'fulfillment value must be undefined'); t.end(); @@ -125,7 +238,7 @@ test('Underlying sink\'s write or close are never invoked if start throws', t => } }); } catch (e) { - t.equal(e, passedError); + t.equal(e, passedError, 'Constructor should throw passedError'); t.end(); return; } @@ -164,16 +277,18 @@ test('WritableStream instances have the correct methods and properties', t => { const ws = new WritableStream(); - t.equal(typeof ws.write, 'function', 'has a write method'); - t.equal(typeof ws.abort, 'function', 'has an abort method'); - t.equal(typeof ws.close, 'function', 'has a close method'); + const writer = ws.getWriter(); - t.equal(ws.state, 'writable', 'state starts out writable'); + t.equal(typeof writer.write, 'function', 'has a write method'); + t.equal(typeof writer.abort, 'function', 'has an abort method'); + t.equal(typeof writer.close, 'function', 'has a close method'); - t.ok(ws.ready, 'has a ready property'); - t.ok(ws.ready.then, 'ready property is a thenable'); - t.ok(ws.closed, 'has a closed property'); - t.ok(ws.closed.then, 'closed property is thenable'); + t.equal(writer.desiredSize, 1, 'desiredSize starts out 1'); + + t.ok(writer.ready, 'has a ready property'); + t.ok(writer.ready.then, 'ready property is a thenable'); + t.ok(writer.closed, 'has a closed property'); + t.ok(writer.closed.then, 'closed property is thenable'); }); test('WritableStream with simple input, processed asynchronously', t => { @@ -199,8 +314,10 @@ test('WritableStream with simple input, processed asynchronously', t => { } }); + const writer = ws.getWriter(); + const input = [1, 2, 3, 4, 5]; - writeArrayToStream(input, ws).then( + writeArrayToStream(input, writer).then( () => t.deepEqual(storage, input, 'correct data was relayed to underlying sink'), r => t.fail(r) ); @@ -220,8 +337,10 @@ test('WritableStream with simple input, processed synchronously', t => { }, }); + const writer = ws.getWriter(); + const input = [1, 2, 3, 4, 5]; - writeArrayToStream(input, ws).then( + writeArrayToStream(input, writer).then( () => t.deepEqual(storage, input, 'correct data was relayed to underlying sink'), r => t.fail(r) ); @@ -233,9 +352,11 @@ test('WritableStream is writable and ready fulfills immediately if the strategy size() { return 0; } }); - t.equal(ws.state, 'writable'); + const writer = ws.getWriter(); - ws.ready.then(() => { + t.equal(writer.desiredSize, Infinity, 'desiredSize should be Infinity'); + + writer.ready.then(() => { t.pass('ready promise was fulfilled'); t.end(); }); @@ -249,7 +370,9 @@ test('Fulfillment value of ws.write() call must be undefined even if the underly } }); - const writePromise = ws.write('a'); + const writer = ws.getWriter(); + + const writePromise = writer.write('a'); writePromise.then(value => { t.equal(value, undefined, 'fulfillment value must be undefined'); t.end(); @@ -260,66 +383,139 @@ test('Fulfillment value of ws.write() call must be undefined even if the underly }); test('WritableStream transitions to waiting until write is acknowledged', t => { - t.plan(3); + t.plan(6); - let resolveWritePromise; + let resolveSinkWritePromise; const ws = new WritableStream({ write() { - return new Promise(resolve => resolveWritePromise = resolve); + const sinkWritePromise = new Promise(resolve => resolveSinkWritePromise = resolve); + return sinkWritePromise; } }); + const writer = ws.getWriter(); + setTimeout(() => { - t.equal(ws.state, 'writable', 'state starts writable'); - const writePromise = ws.write('a'); - t.equal(ws.state, 'waiting', 'state is waiting until the write finishes'); - resolveWritePromise(); - writePromise.then(() => { - t.equal(ws.state, 'writable', 'state becomes writable again after the write finishes'); + t.equal(writer.desiredSize, 1, 'desiredSize starts 1'); + + writer.ready.then(() => { + const writePromise = writer.write('a'); + t.notEqual(resolveSinkWritePromise, undefined, 'resolveSinkWritePromise should not be undefined'); + + t.equal(writer.desiredSize, 0, 'desiredSize should be 0 after writer.write()'); + + writePromise.then(value => { + if (resolveSinkWritePromise !== undefined) { + t.fail('writePromise fulfilled before sinkWritePromise fulfills'); + t.end(); + return; + } + + t.equals(value, undefined, 'writePromise should be fulfilled with undefined'); + }); + + writer.ready.then(value => { + if (resolveSinkWritePromise !== undefined) { + t.fail('writePromise fulfilled before sinkWritePromise fulfills'); + t.end(); + return; + } + + t.equal(writer.desiredSize, 1, 'desiredSize should be 1 again'); + + t.equals(value, undefined, 'writePromise should be fulfilled with undefined'); + }); + + setTimeout(() => { + resolveSinkWritePromise(); + resolveSinkWritePromise = undefined; + }, 100); }); }, 0); }); test('WritableStream if write returns a rejected promise, queued write and close are cleared', t => { - t.plan(6); + t.plan(9); - let rejectWritePromise; + let sinkWritePromiseRejectors = []; const ws = new WritableStream({ write() { - return new Promise((r, reject) => rejectWritePromise = reject); + const sinkWritePromise = new Promise((r, reject) => sinkWritePromiseRejectors.push(reject)); + return sinkWritePromise; } }); + const writer = ws.getWriter(); + setTimeout(() => { - const writePromise = ws.write('a'); + t.equals(writer.desiredSize, 1, 'desiredSize should be 1'); + + const writePromise = writer.write('a'); + t.equals(sinkWritePromiseRejectors.length, 1, 'There should be 1 rejector'); + t.equals(writer.desiredSize, 0, 'desiredSize should be 0'); - t.notStrictEqual(rejectWritePromise, undefined, 'write is called so rejectWritePromise is set'); + const writePromise2 = writer.write('b'); + t.equals(sinkWritePromiseRejectors.length, 1, 'There should be still 1 rejector'); + t.equals(writer.desiredSize, -1, 'desiredSize should be -1'); - const writePromise2 = ws.write('b'); - const closedPromise = ws.close(); + const closedPromise = writer.close(); - t.equal(ws.state, 'closing', 'state is closing until the close finishes'); + t.equals(writer.desiredSize, -1, 'desiredSize should still be -1'); const passedError = new Error('horrible things'); - rejectWritePromise(passedError); + + closedPromise.then( + () => { + t.fail('closedPromise is fulfilled unexpectedly'); + t.end(); + }, + r => { + if (sinkWritePromiseRejectors.length > 0) { + t.fail('closedPromise rejected before sinkWritePromise rejects'); + t.end(); + return; + } + + t.equal(r, passedError, 'closedPromise should reject with passedError'); + } + ); writePromise.then( - () => t.fail('writePromise is fulfilled unexpectedly'), + () => { + t.fail('writePromise is fulfilled unexpectedly'); + t.end(); + }, + r => { + if (sinkWritePromiseRejectors.length > 0) { + t.fail('writePromise2 rejected before sinkWritePromise rejects'); + t.end(); + return; + } + + t.equal(r, passedError, 'writePromise should reject with passedError'); + } + ); + + writePromise2.then( + () => { + t.fail('writePromise2 is fulfilled unexpectedly'); + t.end(); + }, r => { - t.equal(r, passedError); - t.equal(ws.state, 'errored', 'state is errored as the sink called error'); - - writePromise2.then( - () => t.fail('writePromise2 is fulfilled unexpectedly'), - r => t.equal(r, passedError) - ); - - closedPromise.then( - () => t.fail('closedPromise is fulfilled unexpectedly'), - r => t.equal(r, passedError) - ); + if (sinkWritePromiseRejectors.length > 0) { + t.fail('writePromise2 rejected before sinkWritePromise rejects'); + t.end(); + return; + } + + t.equal(r, passedError, 'writePromise2 should reject with passedError'); } ); + + setTimeout(() => { + sinkWritePromiseRejectors[0](passedError); + sinkWritePromiseRejectors = []; + }, 100); }, 0); }); @@ -335,42 +531,60 @@ test('If close is called on a WritableStream in writable state, ready will retur } }); + const writer = ws.getWriter(); + // Wait for ws to start. setTimeout(() => { - t.equal(ws.state, 'writable', 'state must be writable'); + writer.ready.then(v => { + t.equal(writer.desiredSize, 1, 'desiredSize should be 1'); - ws.close(); - t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); + writer.close(); + t.equal(writer.desiredSize, 1, 'desiredSize should be still 1'); - ws.ready.then(v => { - t.equal(ws.state, 'closed', 'state must be closed by the time ready fulfills (because microtasks ordering)'); - t.equal(v, undefined, 'ready promise was fulfilled with undefined'); - t.end(); + writer.ready.then(v => { + t.equal(v, undefined, 'ready promise was fulfilled with undefined'); + t.end(); + }); }); }, 0); }); -test('If close is called on a WritableStream in waiting state, ready will return a fulfilled promise', t => { +test('If close is called on a WritableStream in waiting state, ready promise will fulfill', t => { const ws = new WritableStream({ + write() { + return new Promise(() => {}); + }, abort() { t.fail('Unexpected abort call'); t.end(); } }); + const writer = ws.getWriter(); + // Wait for ws to start. setTimeout(() => { - ws.write('a'); - t.equal(ws.state, 'waiting', 'state must become waiting synchronously on write call'); + writer.write('a'); + + t.equal(writer.desiredSize, 0, 'desiredSize should be 0'); - ws.close(); - t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); + let closeCalled = false; + + writer.ready.then(v => { + if (closeCalled === false) { + t.fail('ready fulfilled before writer.close()'); + t.end(); + return; + } - ws.ready.then(v => { - t.equal(ws.state, 'closing', 'state must still be closing when ready fulfills'); t.equal(v, undefined, 'ready promise was fulfilled with undefined'); t.end(); }); + + setTimeout(() => { + writer.close(); + closeCalled = true; + }, 100); }, 0); }); @@ -395,15 +609,14 @@ test('If close is called on a WritableStream in waiting state, ready will be ful // Wait for ws to start. setTimeout(() => { - ws.write('a'); - t.equal(ws.state, 'waiting', 'state must become waiting synchronously on write call'); + const writer = ws.getWriter(); + + writer.write('a'); - ws.close(); - t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); + writer.close(); - ws.ready.then(v => { + writer.ready.then(v => { readyFulfilledAlready = true; - t.equal(ws.state, 'closing', 'state must still be closing when ready fulfills'); t.equal(v, undefined, 'ready promise was fulfilled with undefined'); t.end(); }); @@ -411,30 +624,30 @@ test('If close is called on a WritableStream in waiting state, ready will be ful }); test('If sink rejects on a WritableStream in writable state, ready will return a fulfilled promise', t => { - t.plan(5); - - let rejectWritePromise; + let rejectSinkWritePromise; const ws = new WritableStream({ write() { - return new Promise((r, reject) => rejectWritePromise = reject); + return new Promise((r, reject) => rejectSinkWritePromise = reject); } }); setTimeout(() => { - t.equal(ws.state, 'writable', 'state is writable to begin'); - const writePromise = ws.write('a'); - t.equal(ws.state, 'waiting', 'state is waiting after a write'); + const writer = ws.getWriter(); + + const writePromise = writer.write('a'); const passedError = new Error('pass me'); - rejectWritePromise(passedError); + rejectSinkWritePromise(passedError); writePromise.then( - () => t.fail('write promise was unexpectedly fulfilled'), + () => { + t.fail('write promise was unexpectedly fulfilled'); + t.end(); + }, r => { t.equal(r, passedError, 'write() should be rejected with the passed error'); - t.equal(ws.state, 'errored', 'state is errored as error is called'); - ws.ready.then(v => { + writer.ready.then(v => { t.equal(v, undefined, 'ready promise was fulfilled with undefined'); t.end(); }); @@ -461,8 +674,9 @@ test('WritableStream if sink\'s close throws', t => { // Wait for ws to start. setTimeout(() => { - const closedPromise = ws.close(); - t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); + const writer = ws.getWriter(); + + const closedPromise = writer.close(); closedPromise.then( () => { @@ -470,10 +684,9 @@ test('WritableStream if sink\'s close throws', t => { t.end(); }, r => { - t.equal(ws.state, 'errored', 'state must be errored as error is called'); t.equal(r, passedError, 'close() should be rejected with the passed error'); - ws.ready.then(v => { + writer.ready.then(v => { t.equal(v, undefined, 'ready promise was fulfilled with undefined'); t.end(); }); @@ -500,8 +713,9 @@ test('WritableStream if the promise returned by sink\'s close rejects', t => { // Wait for ws to start. setTimeout(() => { - const closedPromise = ws.close(); - t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); + const writer = ws.getWriter(); + + const closedPromise = writer.close(); closedPromise.then( () => { @@ -509,10 +723,9 @@ test('WritableStream if the promise returned by sink\'s close rejects', t => { t.end(); }, r => { - t.equal(ws.state, 'errored', 'state must be errored as error is called'); t.equal(r, passedError, 'close() should be rejected with the passed error'); - ws.ready.then(v => { + writer.ready.then(v => { t.equal(v, undefined, 'ready promise was fulfilled with undefined'); t.end(); }); @@ -521,9 +734,7 @@ test('WritableStream if the promise returned by sink\'s close rejects', t => { }, 0); }); -test('If sink rejects on a WritableStream in waiting state, ready will return a rejected promise', t => { - t.plan(5); - +test('If sink\'s write rejects on a WritableStream in waiting state, ready will return a rejected promise', t => { const passedError = new Error('pass me'); const ws = new WritableStream({ write(chunk) { @@ -535,19 +746,21 @@ test('If sink rejects on a WritableStream in waiting state, ready will return a }); setTimeout(() => { - ws.write('first chunk succeeds'); - t.equal(ws.state, 'waiting', 'state is waiting after first write'); + const writer = ws.getWriter(); - const secondWritePromise = ws.write('all other chunks fail'); - t.equal(ws.state, 'waiting', 'state is waiting after a second write'); + writer.write('first chunk succeeds'); + + const secondWritePromise = writer.write('all other chunks fail'); secondWritePromise.then( - () => t.fail('write promise was unexpectedly fulfilled'), + () => { + t.fail('write promise was unexpectedly fulfilled'); + t.end(); + }, r => { t.equal(r, passedError, 'write() should be rejected with the passed error'); - t.equal(ws.state, 'errored', 'state is errored as error is called'); - ws.ready.then(v => { + writer.ready.then(v => { t.equal(v, undefined, 'ready promise was fulfilled with undefined'); t.end(); }); @@ -556,9 +769,8 @@ test('If sink rejects on a WritableStream in waiting state, ready will return a }, 0); }); -test('WritableStream if sink throws an error inside write, the stream becomes errored and the promise rejects', t => { - t.plan(3); - +test('WritableStream if sink\'s write throws an error inside write, the stream becomes errored and the promise ' + + 'rejects', t => { const thrownError = new Error('throw me'); const ws = new WritableStream({ write() { @@ -566,22 +778,31 @@ test('WritableStream if sink throws an error inside write, the stream becomes er } }); - ws.write('a').then( - () => t.fail('write promise was unexpectedly fulfilled'), + const writer = ws.getWriter(); + + writer.write('a').then( + () => { + t.fail('write promise was unexpectedly fulfilled'); + t.end(); + }, r => { t.equal(r, thrownError, 'write() should reject with the thrown error'); - t.equal(ws.state, 'errored', 'state is errored'); - ws.close().then( - () => t.fail('close() is fulfilled unexpectedly'), - r => t.equal(r, thrownError, 'close() should be rejected with the thrown error') + writer.close().then( + () => { + t.fail('close() is fulfilled unexpectedly'); + }, + r => { + t.equal(r.constructor, TypeError, 'close() should be rejected'); + t.end(); + } ); } ); }); -test('WritableStream if sink throws an error while closing, the stream becomes errored', t => { - t.plan(3); +test('WritableStream if sink\'s close throws an error while closing, the stream becomes errored', t => { + t.plan(2); const thrownError = new Error('throw me'); const ws = new WritableStream({ @@ -590,79 +811,70 @@ test('WritableStream if sink throws an error while closing, the stream becomes e } }); - ws.close().then( - () => t.fail('close promise is fulfilled unexpectedly'), - r => { - t.equal(r, thrownError, 'close promise should be rejected with the thrown error'); - t.equal(ws.state, 'errored', 'state is errored after calling close'); - } - ); + const writer = ws.getWriter(); + + promise_rejects( + t, thrownError, writer.close(), 'close promise', 'close promise should be rejected with the thrown error'); setTimeout(() => { - t.equal(ws.state, 'errored', 'state stays errored'); + promise_rejects(t, thrownError, writer.closed, 'closed', 'closed should stay rejected'); }, 0); }); test('WritableStream if sink calls error while asynchronously closing, the stream becomes errored', t => { - t.plan(3); + t.plan(2); const passedError = new Error('error me'); - let error; + let controller; const ws = new WritableStream({ - start(error_) { - error = error_; + start(c) { + controller = c; }, close() { return new Promise(resolve => setTimeout(resolve, 50)); } }); - ws.close(); - setTimeout(() => error(passedError), 10); + const writer = ws.getWriter(); - ws.closed.then( - () => t.fail('closed promise is fulfilled unexpectedly'), - r => { - t.equal(r, passedError, 'closed promise should be rejected with the passed error'); - t.equal(ws.state, 'errored', 'state is errored'); - } - ); + writer.close(); + setTimeout(() => controller.error(passedError), 10); + + promise_rejects( + t, passedError, writer.closed, 'closed promise', 'closed promise should be rejected with the passed error'); setTimeout(() => { - t.equal(ws.state, 'errored', 'state stays errored'); + promise_rejects(t, passedError, writer.closed, 'closed', 'closed should stay rejected'); }, 70); }); test('WritableStream if sink calls error while closing with no asynchrony, the stream becomes errored', t => { - t.plan(3); + t.plan(2); const passedError = new Error('error me'); - let error; + let controller; const ws = new WritableStream({ - start(error_) { - error = error_; + start(c) { + controller = c; }, close() { - error(passedError); + controller.error(passedError); } }); - ws.close().then( - () => t.fail('close promise is fulfilled unexpectedly'), - r => { - t.equal(r, passedError, 'close promise should be rejected with the passed error'); - t.equal(ws.state, 'errored', 'state is errored'); - } - ); + const writer = ws.getWriter(); + + promise_rejects( + t, passedError, writer.close(), 'close promise', 'close promise should be rejected with the passed error'); setTimeout(() => { - t.equal(ws.state, 'errored', 'state stays errored'); + promise_rejects(t, passedError, writer.closed, 'closed', 'closed should stay rejected'); }, 0); }); test('WritableStream queue lots of data and have all of them processed at once', t => { - t.plan(4); + t.plan(2); const numberOfWrites = 10000; @@ -678,19 +890,19 @@ test('WritableStream queue lots of data and have all of them processed at once', }); setTimeout(() => { - let writePromise; - for (let i = 0; i < numberOfWrites; ++i) { - writePromise = ws.write('a'); + const writer = ws.getWriter(); + + for (let i = 1; i < numberOfWrites; ++i) { + writer.write('a'); } + const writePromise = writer.write('a') - t.equal(ws.state, 'waiting', 'state is waiting since the queue is full of writeRecords'); t.equal(writeCount, 1, 'should have called sink\'s write once'); resolveFirstWritePromise(); writePromise.then( () => { - t.equal(ws.state, 'writable', 'state is writable again since all writeRecords is done now'); t.equal(writeCount, numberOfWrites, `should have called sink's write ${numberOfWrites} times`); }, t.ifError @@ -724,9 +936,12 @@ test('WritableStream should call underlying sink methods as methods', t => { theSink.debugName = "the sink object passed to the constructor"; const ws = new WritableStream(theSink); - ws.write('a'); - ws.close(); + const writer = ws.getWriter(); + + writer.write('a'); + writer.close(); const ws2 = new WritableStream(theSink); - ws2.abort(); + const writer2 = ws2.getWriter(); + writer2.abort(); });