diff --git a/src/file.ts b/src/file.ts index 1237fd01d4..9847239603 100644 --- a/src/file.ts +++ b/src/file.ts @@ -31,7 +31,14 @@ import * as extend from 'extend'; import * as fs from 'fs'; import * as mime from 'mime'; import * as resumableUpload from './resumable-upload'; -import {Writable, Readable, pipeline, Transform, PassThrough} from 'stream'; +import { + Writable, + Readable, + pipeline, + Transform, + PassThrough, + PipelineSource, +} from 'stream'; import * as zlib from 'zlib'; import * as http from 'http'; @@ -93,6 +100,8 @@ export interface PolicyDocument { signature: string; } +export type SaveData = string | Buffer | PipelineSource; + export type GenerateSignedPostPolicyV2Response = [PolicyDocument]; export interface GenerateSignedPostPolicyV2Callback { @@ -419,6 +428,7 @@ export enum FileExceptionMessages { UPLOAD_MISMATCH = `The uploaded data did not match the data from the server. As a precaution, the file has been deleted. To be sure the content is the same, you should try uploading the file again.`, + STREAM_NOT_READABLE = 'Stream must be readable.', } /** @@ -3527,13 +3537,9 @@ class File extends ServiceObject { this.copy(newFile, copyOptions, callback!); } - save(data: string | Buffer, options?: SaveOptions): Promise; - save(data: string | Buffer, callback: SaveCallback): void; - save( - data: string | Buffer, - options: SaveOptions, - callback: SaveCallback - ): void; + save(data: SaveData, options?: SaveOptions): Promise; + save(data: SaveData, callback: SaveCallback): void; + save(data: SaveData, options: SaveOptions, callback: SaveCallback): void; /** * @typedef {object} SaveOptions * @extends CreateWriteStreamOptions @@ -3560,7 +3566,7 @@ class File extends ServiceObject { * resumable feature is disabled. *

* - * @param {string | Buffer} data The data to write to a file. + * @param {SaveData} data The data to write to a file. * @param {SaveOptions} [options] See {@link File#createWriteStream}'s `options` * parameter. * @param {SaveCallback} [callback] Callback function. @@ -3588,7 +3594,7 @@ class File extends ServiceObject { * ``` */ save( - data: string | Buffer, + data: SaveData, optionsOrCallback?: SaveOptions | SaveCallback, callback?: SaveCallback ): Promise | void { @@ -3608,28 +3614,60 @@ class File extends ServiceObject { } const returnValue = retry( async (bail: (err: Error) => void) => { - await new Promise((resolve, reject) => { + if (data instanceof Readable && !data.readable) { + // Readable streams can only be consumed once and would cause a dead + // lock when used with pipeline(). + return bail(new Error(FileExceptionMessages.STREAM_NOT_READABLE)); + } + + return new Promise((resolve, reject) => { if (maxRetries === 0) { this.storage.retryOptions.autoRetry = false; } - const writable = this.createWriteStream(options) - .on('error', err => { - if ( - this.storage.retryOptions.autoRetry && - this.storage.retryOptions.retryableErrorFn!(err) - ) { - return reject(err); + const writable = this.createWriteStream(options); + + if (options.onUploadProgress) { + writable.on('progress', options.onUploadProgress); + } + + const handleError = (err: Error) => { + if ( + !this.storage.retryOptions.autoRetry || + !this.storage.retryOptions.retryableErrorFn!(err) + ) { + bail(err); + } + + reject(err); + }; + + if (typeof data === 'string' || Buffer.isBuffer(data)) { + writable + .on('error', handleError) + .on('finish', () => resolve()) + .end(data); + } else { + pipeline(data, writable, err => { + if (err) { + // If data is not a valid PipelineSource, then pipeline will + // fail without destroying the writable stream. If data is a + // PipelineSource that yields invalid chunks (e.g. a stream in + // object mode or an iterable that does not yield Buffers or + // strings), then pipeline will destroy the writable stream. + if (!writable.destroyed) writable.destroy(); + + if (typeof data !== 'function') { + // Only PipelineSourceFunction can be retried. Async-iterables + // and Readable streams can only be consumed once. + bail(err); + } + + handleError(err); } else { - return bail(err); + resolve(); } - }) - .on('finish', () => { - return resolve(); }); - if (options.onUploadProgress) { - writable.on('progress', options.onUploadProgress); } - writable.end(data); }); }, { diff --git a/test/file.ts b/test/file.ts index 14b493f229..27532953b5 100644 --- a/test/file.ts +++ b/test/file.ts @@ -4314,6 +4314,197 @@ describe('File', () => { await file.save(DATA, options, assert.ifError); }); + it('should save a Readable with no errors', done => { + const options = {resumable: false}; + file.createWriteStream = () => { + const writeStream = new PassThrough(); + writeStream.on('data', data => { + assert.strictEqual(data.toString(), DATA); + }); + writeStream.once('finish', done); + return writeStream; + }; + + const readable = new Readable({ + read() { + this.push(DATA); + this.push(null); + }, + }); + + void file.save(readable, options); + }); + + it('should propagate Readable errors', done => { + const options = {resumable: false}; + file.createWriteStream = () => { + const writeStream = new PassThrough(); + let errorCalled = false; + writeStream.on('data', data => { + assert.strictEqual(data.toString(), DATA); + }); + writeStream.on('error', err => { + errorCalled = true; + assert.strictEqual(err.message, 'Error!'); + }); + writeStream.on('finish', () => { + assert.ok(errorCalled); + }); + return writeStream; + }; + + const readable = new Readable({ + read() { + setTimeout(() => { + this.push(DATA); + this.destroy(new Error('Error!')); + }, 50); + }, + }); + + file.save(readable, options, (err: Error) => { + assert.strictEqual(err.message, 'Error!'); + done(); + }); + }); + + it('Readable upload should not retry', async () => { + const options = {resumable: false}; + + let retryCount = 0; + + file.createWriteStream = () => { + retryCount++; + return new Transform({ + transform( + chunk: string | Buffer, + _encoding: string, + done: Function + ) { + this.push(chunk); + setTimeout(() => { + done(new HTTPError('retryable error', 408)); + }, 5); + }, + }); + }; + try { + const readable = new Readable({ + read() { + this.push(DATA); + this.push(null); + }, + }); + + await file.save(readable, options); + throw Error('unreachable'); + } catch (e) { + assert.strictEqual((e as Error).message, 'retryable error'); + assert.ok(retryCount === 1); + } + }); + + it('Destroyed Readable upload should throw', async () => { + const options = {resumable: false}; + + file.createWriteStream = () => { + throw new Error('unreachable'); + }; + try { + const readable = new Readable({ + read() { + this.push(DATA); + this.push(null); + }, + }); + + readable.destroy(); + + await file.save(readable, options); + } catch (e) { + assert.strictEqual( + (e as Error).message, + FileExceptionMessages.STREAM_NOT_READABLE + ); + } + }); + + it('should save a generator with no error', done => { + const options = {resumable: false}; + file.createWriteStream = () => { + const writeStream = new PassThrough(); + writeStream.on('data', data => { + assert.strictEqual(data.toString(), DATA); + done(); + }); + return writeStream; + }; + + const generator = async function* (arg?: {signal?: AbortSignal}) { + await new Promise(resolve => setTimeout(resolve, 5)); + if (arg?.signal?.aborted) return; + yield DATA; + }; + + void file.save(generator, options); + }); + + it('should propagate async iterable errors', done => { + const options = {resumable: false}; + file.createWriteStream = () => { + const writeStream = new PassThrough(); + let errorCalled = false; + writeStream.on('data', data => { + assert.strictEqual(data.toString(), DATA); + }); + writeStream.on('error', err => { + errorCalled = true; + assert.strictEqual(err.message, 'Error!'); + }); + writeStream.on('finish', () => { + assert.ok(errorCalled); + }); + return writeStream; + }; + + const generator = async function* () { + yield DATA; + throw new Error('Error!'); + }; + + file.save(generator(), options, (err: Error) => { + assert.strictEqual(err.message, 'Error!'); + done(); + }); + }); + + it('should error on invalid async iterator data', done => { + const options = {resumable: false}; + file.createWriteStream = () => { + const writeStream = new PassThrough(); + let errorCalled = false; + writeStream.on('error', () => { + errorCalled = true; + }); + writeStream.on('finish', () => { + assert.ok(errorCalled); + }); + return writeStream; + }; + + const generator = async function* () { + yield {thisIsNot: 'a buffer or a string'}; + }; + + file.save(generator(), options, (err: Error) => { + assert.strictEqual( + err.message, + 'The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object' + ); + done(); + }); + }); + it('buffer upload should retry on first failure', async () => { const options = { resumable: false,