diff --git a/src/file.ts b/src/file.ts index 6eed2cabd..d949106a0 100644 --- a/src/file.ts +++ b/src/file.ts @@ -29,14 +29,7 @@ import * as crypto from 'crypto'; import * as fs from 'fs'; import * as mime from 'mime'; import * as resumableUpload from './resumable-upload'; -import { - Writable, - Readable, - pipeline, - Transform, - PassThrough, - PipelineSource, -} from 'stream'; +import {Writable, Readable, pipeline, Transform, PassThrough} from 'stream'; import * as zlib from 'zlib'; import * as http from 'http'; @@ -100,8 +93,6 @@ export interface PolicyDocument { signature: string; } -export type SaveData = string | Buffer | PipelineSource; - export type GenerateSignedPostPolicyV2Response = [PolicyDocument]; export interface GenerateSignedPostPolicyV2Callback { @@ -463,7 +454,6 @@ export enum FileExceptionMessages { UPLOAD_MISMATCH = `The uploaded data did not match the data from the server. As a precaution, the file has been deleted. To be sure the content is the same, you should try uploading the file again.`, - STREAM_NOT_READABLE = 'Stream must be readable.', } /** @@ -3567,9 +3557,13 @@ class File extends ServiceObject { this.copy(newFile, copyOptions, callback!); } - save(data: SaveData, options?: SaveOptions): Promise; - save(data: SaveData, callback: SaveCallback): void; - save(data: SaveData, options: SaveOptions, callback: SaveCallback): void; + save(data: string | Buffer, options?: SaveOptions): Promise; + save(data: string | Buffer, callback: SaveCallback): void; + save( + data: string | Buffer, + options: SaveOptions, + callback: SaveCallback + ): void; /** * @typedef {object} SaveOptions * @extends CreateWriteStreamOptions @@ -3596,7 +3590,7 @@ class File extends ServiceObject { * resumable feature is disabled. *

* - * @param {SaveData} data The data to write to a file. + * @param {string | Buffer} data The data to write to a file. * @param {SaveOptions} [options] See {@link File#createWriteStream}'s `options` * parameter. * @param {SaveCallback} [callback] Callback function. @@ -3624,7 +3618,7 @@ class File extends ServiceObject { * ``` */ save( - data: SaveData, + data: string | Buffer, optionsOrCallback?: SaveOptions | SaveCallback, callback?: SaveCallback ): Promise | void { @@ -3644,68 +3638,28 @@ class File extends ServiceObject { } const returnValue = retry( async (bail: (err: Error) => void) => { - if (data instanceof Readable) { - // Make sure any pending async readable operations are finished before - // attempting to check if the stream is readable. - await new Promise(resolve => setImmediate(resolve)); - - if (!data.readable || data.destroyed) { - // Calling pipeline() with a non-readable stream will result in the - // callback being called without an error, and no piping taking - // place. In that case, file.save() would appear to succeed, but - // nothing would be uploaded. - return bail(new Error(FileExceptionMessages.STREAM_NOT_READABLE)); - } - } - - return new Promise((resolve, reject) => { + await new Promise((resolve, reject) => { if (maxRetries === 0) { this.storage.retryOptions.autoRetry = false; } - const writable = this.createWriteStream(options); - - if (options.onUploadProgress) { - writable.on('progress', options.onUploadProgress); - } - - const handleError = (err: Error) => { - if ( - !this.storage.retryOptions.autoRetry || - !this.storage.retryOptions.retryableErrorFn!(err) - ) { - bail(err); - } - - reject(err); - }; - - if (typeof data === 'string' || Buffer.isBuffer(data)) { - writable - .on('error', handleError) - .on('finish', () => resolve()) - .end(data); - } else { - pipeline(data, writable, err => { - if (err) { - // If data is not a valid PipelineSource, then pipeline will - // fail without destroying the writable stream. If data is a - // PipelineSource that yields invalid chunks (e.g. a stream in - // object mode or an iterable that does not yield Buffers or - // strings), then pipeline will destroy the writable stream. - if (!writable.destroyed) writable.destroy(); - - if (typeof data !== 'function') { - // Only PipelineSourceFunction can be retried. Async-iterables - // and Readable streams can only be consumed once. - bail(err); - } - - handleError(err); + const writable = this.createWriteStream(options) + .on('error', err => { + if ( + this.storage.retryOptions.autoRetry && + this.storage.retryOptions.retryableErrorFn!(err) + ) { + return reject(err); } else { - resolve(); + return bail(err); } + }) + .on('finish', () => { + return resolve(); }); + if (options.onUploadProgress) { + writable.on('progress', options.onUploadProgress); } + writable.end(data); }); }, { diff --git a/test/file.ts b/test/file.ts index 567cf77d4..aa050254e 100644 --- a/test/file.ts +++ b/test/file.ts @@ -4275,197 +4275,6 @@ describe('File', () => { await file.save(DATA, options, assert.ifError); }); - it('should save a Readable with no errors', done => { - const options = {resumable: false}; - file.createWriteStream = () => { - const writeStream = new PassThrough(); - writeStream.on('data', data => { - assert.strictEqual(data.toString(), DATA); - }); - writeStream.once('finish', done); - return writeStream; - }; - - const readable = new Readable({ - read() { - this.push(DATA); - this.push(null); - }, - }); - - void file.save(readable, options); - }); - - it('should propagate Readable errors', done => { - const options = {resumable: false}; - file.createWriteStream = () => { - const writeStream = new PassThrough(); - let errorCalled = false; - writeStream.on('data', data => { - assert.strictEqual(data.toString(), DATA); - }); - writeStream.on('error', err => { - errorCalled = true; - assert.strictEqual(err.message, 'Error!'); - }); - writeStream.on('finish', () => { - assert.ok(errorCalled); - }); - return writeStream; - }; - - const readable = new Readable({ - read() { - setTimeout(() => { - this.push(DATA); - this.destroy(new Error('Error!')); - }, 50); - }, - }); - - file.save(readable, options, (err: Error) => { - assert.strictEqual(err.message, 'Error!'); - done(); - }); - }); - - it('Readable upload should not retry', async () => { - const options = {resumable: false}; - - let retryCount = 0; - - file.createWriteStream = () => { - retryCount++; - return new Transform({ - transform( - chunk: string | Buffer, - _encoding: string, - done: Function - ) { - this.push(chunk); - setTimeout(() => { - done(new HTTPError('retryable error', 408)); - }, 5); - }, - }); - }; - try { - const readable = new Readable({ - read() { - this.push(DATA); - this.push(null); - }, - }); - - await file.save(readable, options); - throw Error('unreachable'); - } catch (e) { - assert.strictEqual((e as Error).message, 'retryable error'); - assert.ok(retryCount === 1); - } - }); - - it('Destroyed Readable upload should throw', async () => { - const options = {resumable: false}; - - file.createWriteStream = () => { - throw new Error('unreachable'); - }; - try { - const readable = new Readable({ - read() { - this.push(DATA); - this.push(null); - }, - }); - - readable.destroy(); - - await file.save(readable, options); - } catch (e) { - assert.strictEqual( - (e as Error).message, - FileExceptionMessages.STREAM_NOT_READABLE - ); - } - }); - - it('should save a generator with no error', done => { - const options = {resumable: false}; - file.createWriteStream = () => { - const writeStream = new PassThrough(); - writeStream.on('data', data => { - assert.strictEqual(data.toString(), DATA); - done(); - }); - return writeStream; - }; - - const generator = async function* (arg?: {signal?: AbortSignal}) { - await new Promise(resolve => setTimeout(resolve, 5)); - if (arg?.signal?.aborted) return; - yield DATA; - }; - - void file.save(generator, options); - }); - - it('should propagate async iterable errors', done => { - const options = {resumable: false}; - file.createWriteStream = () => { - const writeStream = new PassThrough(); - let errorCalled = false; - writeStream.on('data', data => { - assert.strictEqual(data.toString(), DATA); - }); - writeStream.on('error', err => { - errorCalled = true; - assert.strictEqual(err.message, 'Error!'); - }); - writeStream.on('finish', () => { - assert.ok(errorCalled); - }); - return writeStream; - }; - - const generator = async function* () { - yield DATA; - throw new Error('Error!'); - }; - - file.save(generator(), options, (err: Error) => { - assert.strictEqual(err.message, 'Error!'); - done(); - }); - }); - - it('should error on invalid async iterator data', done => { - const options = {resumable: false}; - file.createWriteStream = () => { - const writeStream = new PassThrough(); - let errorCalled = false; - writeStream.on('error', () => { - errorCalled = true; - }); - writeStream.on('finish', () => { - assert.ok(errorCalled); - }); - return writeStream; - }; - - const generator = async function* () { - yield {thisIsNot: 'a buffer or a string'}; - }; - - file.save(generator(), options, (err: Error) => { - assert.strictEqual( - err.message, - 'The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object' - ); - done(); - }); - }); - it('buffer upload should retry on first failure', async () => { const options = { resumable: false,