From 68a4878ab7decd161225de8eb9da3f803d3085fb Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Wed, 15 Jul 2015 11:46:05 -0400 Subject: [PATCH] core: add retry logic to read streams --- lib/common/util.js | 167 +++++++++++------- lib/storage/file.js | 175 ++++++++---------- package.json | 2 + system-test/storage.js | 1 + test/common/util.js | 389 ++++++++++++++++++++++++----------------- test/storage/file.js | 99 ++++------- 6 files changed, 436 insertions(+), 397 deletions(-) diff --git a/lib/common/util.js b/lib/common/util.js index 6c89bea50f7..9b690379842 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -29,6 +29,9 @@ var request = require('request').defaults({ maxSockets: Infinity } }); +var retryRequest = require('retry-request'); +var streamForward = require('stream-forward'); +var through = require('through2'); var uuid = require('node-uuid'); /** @const {object} gcloud-node's package.json file. */ @@ -178,34 +181,68 @@ nodeutil.inherits(ApiError, Error); */ function handleResp(err, resp, body, callback) { callback = callback || noop; + if (err) { callback(err); return; } - if (typeof body === 'string') { - try { - body = JSON.parse(body); - } catch(err) {} - } - if (body && body.error) { - // Error from JSON api. - callback(new ApiError(body.error)); + + var parsedApiResponse = util.parseApiResp(resp, body); + + if (parsedApiResponse.err) { + callback(parsedApiResponse.err); return; } - if (resp && (resp.statusCode < 200 || resp.statusCode > 299)) { + + callback(null, parsedApiResponse.body, parsedApiResponse.resp); +} + +util.handleResp = handleResp; + +/** + * From an HTTP response, generate an error if one occurred. + * + * @param {*} resp - Response value. + * @param {*=} body - Body value. + * @return {object} parsedResponse - The parsed response. + * @param {?error} parsedResponse.err - An error detected. + * @param {object} parsedResponse.resp - The original response object. + * @param {*} parsedREsponse.body - The original body value provided will try to + * be JSON.parse'd. If it's successful, the parsed value will be returned + * here, otherwise the original value. + */ +function parseApiResp(resp, body) { + var parsedResponse = { + err: null, + resp: resp, + body: body + }; + + if (resp.statusCode < 200 || resp.statusCode > 299) { // Unknown error. Format according to ApiError standard. - callback(new ApiError({ + parsedResponse.err = new ApiError({ errors: [], code: resp.statusCode, - message: body || 'Error during request.', + message: 'Error during request.', response: resp - })); - return; + }); + } + + if (util.is(body, 'string')) { + try { + parsedResponse.body = JSON.parse(body); + } catch(err) {} + } + + if (parsedResponse.body && parsedResponse.body.error) { + // Error from JSON API. + parsedResponse.err = new ApiError(parsedResponse.body.error); } - callback(null, body, resp); + + return parsedResponse; } -util.handleResp = handleResp; +util.parseApiResp = parseApiResp; /** * Get the type of a value. @@ -418,20 +455,6 @@ function makeWritableStream(dup, options, onComplete) { util.makeWritableStream = makeWritableStream; -/** - * Returns an exponential distributed time to wait given the number of retries - * that have been previously been attempted on the request. - * - * @param {number} retryNumber - The number of retries previously attempted. - * @return {number} An exponentially distributed time to wait E.g. for use with - * exponential backoff. - */ -function getNextRetryWait(retryNumber) { - return (Math.pow(2, retryNumber) * 1000) + Math.floor(Math.random() * 1000); -} - -util.getNextRetryWait = getNextRetryWait; - /** * Returns true if the API request should be retried, given the error that was * given the first time the request was attempted. This is used for rate limit @@ -599,31 +622,45 @@ function makeAuthorizedRequestFactory(config) { * be made. Instead, this function is passed the error & authorized * request options. */ - function makeAuthorizedRequest(reqOpts, callback) { - if (config.customEndpoint) { - // Using a custom API override. Do not use `google-auth-library` for - // authentication. (ex: connecting to a local Datastore server) - if (callback.onAuthorized) { - callback.onAuthorized(null, reqOpts); - } else { - util.makeRequest(reqOpts, config, callback); - } + function makeAuthorizedRequest(reqOpts, options) { + var stream; + var reqConfig = extend({}, config); - return; + if (!options) { + stream = through(); + reqConfig.stream = stream; } - util.authorizeRequest(config, reqOpts, function(err, authorizedReqOpts) { + function onAuthorized(err, authorizedReqOpts) { if (err) { - (callback.onAuthorized || callback)(err); + if (stream) { + stream.emit('error', err); + stream.end(); + } else { + (options.onAuthorized || options)(err); + } + return; } - if (callback.onAuthorized) { - callback.onAuthorized(null, authorizedReqOpts); + if (options && options.onAuthorized) { + options.onAuthorized(null, authorizedReqOpts); } else { - util.makeRequest(authorizedReqOpts, config, callback); + util.makeRequest(authorizedReqOpts, reqConfig, options); } - }); + } + + if (reqConfig.customEndpoint) { + // Using a custom API override. Do not use `google-auth-library` for + // authentication. (ex: connecting to a local Datastore server) + onAuthorized(null, reqOpts); + } else { + util.authorizeRequest(reqConfig, reqOpts, onAuthorized); + } + + if (stream) { + return stream; + } } makeAuthorizedRequest.getCredentials = function(callback) { @@ -653,8 +690,8 @@ function makeAuthorizedRequestFactory(config) { util.makeAuthorizedRequestFactory = makeAuthorizedRequestFactory; /** - * Make a request through the `request` module with built-in error handling and - * exponential back off. + * Make a request through the `retryRequest` module with built-in error handling + * and exponential back off. * * @param {object} reqOpts - Request options in the format `request` expects. * @param {object=} config - Configuration object. @@ -676,30 +713,26 @@ function makeRequest(reqOpts, config, callback) { reqOpts = util.decorateRequest(reqOpts); - var MAX_RETRIES = config.maxRetries || 3; - var autoRetry = config.autoRetry !== false ? true : false; - var attemptedRetries = 0; + var options = { + request: request, - function shouldRetry(err) { - return autoRetry && - MAX_RETRIES > attemptedRetries && - util.shouldRetryRequest(err); - } + retries: config.autoRetry !== false ? config.maxRetries || 3 : 0, - function makeRateLimitedRequest() { - request(reqOpts, function(err, resp, body) { - util.handleResp(err, resp, body, function(err, body, resp) { - if (shouldRetry(err)) { - var delay = util.getNextRetryWait(attemptedRetries++); - setTimeout(makeRateLimitedRequest, delay); - } else { - callback(err || null, body, resp); - } - }); + shouldRetryFn: function(resp) { + var err = util.parseApiResp(resp).err; + return err && util.shouldRetryRequest(err); + } + }; + + if (config.stream) { + // `streamForward` is used to re-emit the events the request stream receives + // on to the stream the user is holding (config.stream). + streamForward(retryRequest(reqOpts, options)).pipe(config.stream); + } else { + retryRequest(reqOpts, options, function(err, response, body) { + util.handleResp(err, response, body, callback); }); } - - makeRateLimitedRequest(); } util.makeRequest = makeRequest; diff --git a/lib/storage/file.js b/lib/storage/file.js index 4e9557190e8..4ab4d472e41 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -417,8 +417,6 @@ File.prototype.createReadStream = function(options) { var tailRequest = options.end < 0; var throughStream = streamEvents(through()); - var requestStream; - var validations = ['crc32c', 'md5']; var validation; @@ -459,18 +457,13 @@ File.prototype.createReadStream = function(options) { throughStream.destroy(); }); - var endRequestStream = once(function() { - requestStream.abort(); - requestStream.destroy(); - }); - - createAuthorizedReq(remoteFilePath); + makeAuthorizedReq(remoteFilePath); return throughStream; // Authenticate the request, then pipe the remote API request to the stream // returned to the user. - function createAuthorizedReq(uri) { + function makeAuthorizedReq(uri) { var reqOpts = { uri: uri }; @@ -495,99 +488,83 @@ File.prototype.createReadStream = function(options) { }; } - that.bucket.storage.makeAuthorizedRequest_(reqOpts, { - onAuthorized: function(err, authorizedReqOpts) { - if (err) { - endThroughStream(err); + // For data integrity, hash the contents of the stream as we receive it + // from the server. + var localCrcHash; + var localMd5Hash = crypto.createHash('md5'); + + var requestStream = that.bucket.storage.makeAuthorizedRequest_(reqOpts); + + requestStream + .on('error', endThroughStream) + + .on('response', throughStream.emit.bind(throughStream, 'response')) + + .on('data', function(chunk) { + if (crc32c) { + localCrcHash = crc.calculate(chunk, localCrcHash); + } + + if (md5) { + localMd5Hash.update(chunk); + } + }) + + .on('complete', function(res) { + if (rangeRequest) { + // Range requests can't receive data integrity checks. + endThroughStream(null, res); return; } - // For data integrity, hash the contents of the stream as we receive it - // from the server. - var localCrcHash; - var localMd5Hash = crypto.createHash('md5'); - - requestStream = request(authorizedReqOpts); - - requestStream - .on('error', function(err) { - endRequestStream(); - endThroughStream(err); - }) - - .on('response', throughStream.emit.bind(throughStream, 'response')) - - .on('data', function(chunk) { - if (crc32c) { - localCrcHash = crc.calculate(chunk, localCrcHash); - } - - if (md5) { - localMd5Hash.update(chunk); - } - }) - - .on('complete', function(res) { - util.handleResp(null, res, res.body, function(err, resp) { - if (err) { - endThroughStream(err, resp); - return; - } - - if (rangeRequest) { - // Range requests can't receive data integrity checks. - endThroughStream(null, resp); - return; - } - - var failed = false; - var crcFail = true; - var md5Fail = true; - - var hashes = {}; - res.headers['x-goog-hash'].split(',').forEach(function(hash) { - var hashType = hash.split('=')[0]; - hashes[hashType] = hash.substr(hash.indexOf('=') + 1); - }); - - var remoteMd5 = hashes.md5; - var remoteCrc = hashes.crc32c && hashes.crc32c.substr(4); - - if (crc32c) { - crcFail = - new Buffer([localCrcHash]).toString('base64') !== remoteCrc; - failed = crcFail; - } - - if (md5) { - md5Fail = localMd5Hash.digest('base64') !== remoteMd5; - failed = md5Fail; - } - - if (validation === 'all') { - failed = remoteMd5 ? md5Fail : crcFail; - } - - if (failed) { - var mismatchError = new Error([ - 'The downloaded data did not match the data from the server.', - 'To be sure the content is the same, you should download the', - 'file again.' - ].join(' ')); - mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH'; - - endThroughStream(mismatchError, resp); - } else { - endThroughStream(null, resp); - } - }); - }) - - .pipe(throughStream) - - .on('error', endRequestStream); - } - }); + var failed = false; + var crcFail = true; + var md5Fail = true; + + var hashes = {}; + res.headers['x-goog-hash'].split(',').forEach(function(hash) { + var hashType = hash.split('=')[0]; + hashes[hashType] = hash.substr(hash.indexOf('=') + 1); + }); + + var remoteMd5 = hashes.md5; + var remoteCrc = hashes.crc32c && hashes.crc32c.substr(4); + + if (crc32c) { + crcFail = + new Buffer([localCrcHash]).toString('base64') !== remoteCrc; + failed = crcFail; + } + + if (md5) { + md5Fail = localMd5Hash.digest('base64') !== remoteMd5; + failed = md5Fail; + } + + if (validation === 'all') { + failed = remoteMd5 ? md5Fail : crcFail; + } + + if (failed) { + var mismatchError = new Error([ + 'The downloaded data did not match the data from the server.', + 'To be sure the content is the same, you should download the', + 'file again.' + ].join(' ')); + mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH'; + + endThroughStream(mismatchError, res); + } else { + endThroughStream(null, res); + } + }) + + .pipe(throughStream) + + .on('error', function() { + requestStream.abort(); + requestStream.destroy(); + }); } }; diff --git a/package.json b/package.json index 26a1246d99c..b78ae3dce45 100644 --- a/package.json +++ b/package.json @@ -60,8 +60,10 @@ "once": "^1.3.1", "protobufjs": "^3.8.2", "request": "^2.53.0", + "retry-request": "^1.1.0", "sse4_crc32": "^3.1.0", "stream-events": "^1.0.1", + "stream-forward": "^2.0.0", "through2": "^0.6.3" }, "devDependencies": { diff --git a/system-test/storage.js b/system-test/storage.js index 0e07d9d50a5..a44adb23a57 100644 --- a/system-test/storage.js +++ b/system-test/storage.js @@ -609,6 +609,7 @@ describe('storage', function() { writable.on('complete', function() { file.createReadStream() + .on('error', done) .pipe(fs.createWriteStream(tmpFilePath)) .on('error', done) .on('finish', function() { diff --git a/test/common/util.js b/test/common/util.js index cf039ba67e6..ef1612fa55c 100644 --- a/test/common/util.js +++ b/test/common/util.js @@ -24,7 +24,9 @@ var extend = require('extend'); var googleAuthLibrary = require('google-auth-library'); var mockery = require('mockery'); var request = require('request'); +var retryRequest = require('retry-request'); var stream = require('stream'); +var streamForward = require('stream-forward'); var googleAuthLibrary_Override; function fakeGoogleAuthLibrary() { @@ -44,6 +46,16 @@ fakeRequest.defaults = function(defaultConfiguration) { return fakeRequest; }; +var retryRequest_Override; +function fakeRetryRequest() { + return (retryRequest_Override || retryRequest).apply(null, arguments); +} + +var streamForward_Override; +function fakeStreamForward() { + return (streamForward_Override || streamForward).apply(null, arguments); +} + describe('common/util', function() { var util; var utilOverrides = {}; @@ -51,6 +63,8 @@ describe('common/util', function() { before(function() { mockery.registerMock('google-auth-library', fakeGoogleAuthLibrary); mockery.registerMock('request', fakeRequest); + mockery.registerMock('retry-request', fakeRetryRequest); + mockery.registerMock('stream-forward', fakeStreamForward); mockery.enable({ useCleanCache: true, warnOnUnregistered: false @@ -80,6 +94,8 @@ describe('common/util', function() { beforeEach(function() { googleAuthLibrary_Override = null; request_Override = null; + retryRequest_Override = null; + streamForward_Override = null; utilOverrides = {}; }); @@ -140,41 +156,82 @@ describe('common/util', function() { describe('handleResp', function() { it('should handle errors', function(done) { - var defaultErr = new Error('new error'); - util.handleResp(defaultErr, null, null, function(err) { - assert.equal(err, defaultErr); + var error = new Error('Error.'); + + util.handleResp(error, null, null, function(err) { + assert.strictEqual(err, error); done(); }); }); - it('should handle body errors', function(done) { - var apiErr = { - errors: [{ foo: 'bar' }], - code: 400, - message: 'an error occurred' + it('should parse response', function(done) { + var resp = { a: 'b', c: 'd' }; + var body = { a: 'b', c: 'd' }; + + var returnedBody = { a: 'b', c: 'd' }; + var returnedResp = { a: 'b', c: 'd' }; + + utilOverrides.parseApiResp = function(resp_, body_) { + assert.strictEqual(resp_, resp); + assert.strictEqual(body_, body); + + return { + err: null, + body: returnedBody, + resp: returnedResp + }; }; - util.handleResp(null, {}, { error: apiErr }, function(err) { - assert.deepEqual(err.errors, apiErr.errors); - assert.strictEqual(err.code, apiErr.code); - assert.deepEqual(err.message, apiErr.message); + + util.handleResp(null, resp, body, function(err, body, resp) { + assert.ifError(err); + assert.strictEqual(body, returnedBody); + assert.strictEqual(resp, returnedResp); done(); }); }); - it('should try to parse JSON if body is string', function(done) { - var body = '{ "foo": "bar" }'; - util.handleResp(null, {}, body, function(err, body) { - assert.strictEqual(body.foo, 'bar'); + it('should parse response for error', function(done) { + var error = new Error('Error.'); + + utilOverrides.parseApiResp = function() { + return { err: error }; + }; + + util.handleResp(null, {}, {}, function(err) { + assert.strictEqual(err, error); done(); }); }); + }); - it('should return err code if there are not other errors', function(done) { - util.handleResp(null, { statusCode: 400 }, null, function(err) { - assert.strictEqual(err.code, 400); - assert.strictEqual(err.message, 'Error during request.'); - done(); - }); + describe('parseApiResp', function() { + it('should return err code if there are not other errors', function() { + var parsedApiResp = util.parseApiResp({ statusCode: 400 }); + + assert.strictEqual(parsedApiResp.err.code, 400); + assert.strictEqual(parsedApiResp.err.message, 'Error during request.'); + }); + + it('should detect body errors', function() { + var apiErr = { + errors: [{ foo: 'bar' }], + code: 400, + message: 'an error occurred' + }; + + var parsedApiResp = util.parseApiResp({}, { error: apiErr }); + + assert.deepEqual(parsedApiResp.err.errors, apiErr.errors); + assert.strictEqual(parsedApiResp.err.code, apiErr.code); + assert.deepEqual(parsedApiResp.err.message, apiErr.message); + }); + + it('should try to parse JSON if body is string', function() { + var body = '{ "foo": "bar" }'; + + var parsedApiResp = util.parseApiResp({}, body); + + assert.strictEqual(parsedApiResp.body.foo, 'bar'); }); }); @@ -589,7 +646,28 @@ describe('common/util', function() { }; var makeAuthorizedRequest = util.makeAuthorizedRequestFactory(config); - makeAuthorizedRequest(reqOpts); + makeAuthorizedRequest(reqOpts, {}); + }); + + it('should return a stream if callback is missing', function() { + utilOverrides.authorizeRequest = function() {}; + + var makeAuthorizedRequest = util.makeAuthorizedRequestFactory({}); + assert(makeAuthorizedRequest({}) instanceof stream.Stream); + }); + + it('should provide stream to authorizeRequest', function(done) { + var stream; + + utilOverrides.authorizeRequest = function(cfg) { + setImmediate(function() { + assert.strictEqual(cfg.stream, stream); + done(); + }); + }; + + var makeAuthorizedRequest = util.makeAuthorizedRequestFactory(); + stream = makeAuthorizedRequest(); }); describe('authorization errors', function() { @@ -597,14 +675,16 @@ describe('common/util', function() { beforeEach(function() { utilOverrides.authorizeRequest = function(cfg, rOpts, callback) { - callback(error); + setImmediate(function() { + callback(error); + }); }; }); it('should invoke the callback with error', function(done) { var makeAuthorizedRequest = util.makeAuthorizedRequestFactory(); makeAuthorizedRequest({}, function(err) { - assert.deepEqual(err, error); + assert.strictEqual(err, error); done(); }); }); @@ -613,21 +693,36 @@ describe('common/util', function() { var makeAuthorizedRequest = util.makeAuthorizedRequestFactory(); makeAuthorizedRequest({}, { onAuthorized: function(err) { - assert.deepEqual(err, error); + assert.strictEqual(err, error); done(); } }); }); + + it('should emit an error and end the stream', function(done) { + var makeAuthorizedRequest = util.makeAuthorizedRequestFactory(); + makeAuthorizedRequest({}).on('error', function(err) { + assert.strictEqual(err, error); + + var stream = this; + setImmediate(function() { + assert.strictEqual(stream._readableState.ended, true); + done(); + }); + }); + }); }); describe('authorization success', function() { var reqOpts = { a: 'b', c: 'd' }; - it('should return the authorized request to callback', function(done) { + beforeEach(function() { utilOverrides.authorizeRequest = function(cfg, rOpts, callback) { callback(null, rOpts); }; + }); + it('should return the authorized request to callback', function(done) { var makeAuthorizedRequest = util.makeAuthorizedRequestFactory(); makeAuthorizedRequest(reqOpts, { onAuthorized: function(err, authorizedReqOpts) { @@ -640,10 +735,6 @@ describe('common/util', function() { it('should make request with correct options', function(done) { var config = { a: 'b', c: 'd' }; - utilOverrides.authorizeRequest = function(cfg, rOpts, callback) { - callback(null, rOpts); - }; - utilOverrides.makeRequest = function(authorizedReqOpts, cfg, cb) { assert.deepEqual(authorizedReqOpts, reqOpts); assert.deepEqual(cfg, config); @@ -653,6 +744,20 @@ describe('common/util', function() { var makeAuthorizedRequest = util.makeAuthorizedRequestFactory(config); makeAuthorizedRequest(reqOpts, done); }); + + it('should provide stream to makeRequest', function(done) { + var stream; + + utilOverrides.makeRequest = function(authorizedReqOpts, cfg) { + setImmediate(function() { + assert.strictEqual(cfg.stream, stream); + done(); + }); + }; + + var makeAuthorizedRequest = util.makeAuthorizedRequestFactory({}); + stream = makeAuthorizedRequest(reqOpts); + }); }); }); @@ -831,170 +936,128 @@ describe('common/util', function() { }); }); - describe('getNextRetryWait', function() { - function secs(seconds) { - return seconds * 1000; - } - - it('should return exponential retry delay', function() { - [1, 2, 3, 4, 5].forEach(assertTime); - - function assertTime(retryNumber) { - var min = (Math.pow(2, retryNumber) * secs(1)); - var max = (Math.pow(2, retryNumber) * secs(1)) + secs(1); - - var time = util.getNextRetryWait(retryNumber); - - assert(time >= min && time <= max); + describe('makeRequest', function() { + var PKG = require('../../package.json'); + var USER_AGENT = 'gcloud-node/' + PKG.version; + var reqOpts = { a: 'b', c: 'd' }; + var expectedReqOpts = extend(true, {}, reqOpts, { + headers: { + 'User-Agent': USER_AGENT } }); - }); - describe('makeRequest', function() { - it('should decorate the request', function(done) { - var reqOpts = { a: 'b', c: 'd' }; + function testDefaultRetryRequestConfig(done) { + return function(reqOpts, config) { + assert.deepEqual(reqOpts, expectedReqOpts); + assert.equal(config.retries, 3); + assert.strictEqual(config.request, fakeRequest); - request_Override = util.noop; + var error = new Error('Error.'); + utilOverrides.parseApiResp = function() { + return { err: error }; + }; + utilOverrides.shouldRetryRequest = function(err) { + assert.strictEqual(err, error); + done(); + }; - utilOverrides.decorateRequest = function(reqOpts_) { - assert.strictEqual(reqOpts_, reqOpts); - done(); + config.shouldRetryFn(); }; + } - util.makeRequest(reqOpts, {}, assert.ifError); - }); - - it('should make a request', function(done) { - request_Override = function() { + var noRetryRequestConfig = { autoRetry: false }; + function testNoRetryRequestConfig(done) { + return function(reqOpts, config) { + assert.strictEqual(config.retries, 0); done(); }; + } - util.makeRequest({}, assert.ifError, {}); - }); + var customRetryRequestConfig = { maxRetries: 10 }; + function testCustomRetryRequestConfig(done) { + return function(reqOpts, config) { + assert.strictEqual(config.retries, customRetryRequestConfig.maxRetries); + done(); + }; + } - it('should let handleResp handle the response', function(done) { - var error = new Error('Error.'); - var response = { a: 'b', c: 'd' }; - var body = response.a; + it('should decorate the request', function(done) { + var reqOpts = { a: 'b', c: 'd' }; - request_Override = function(rOpts, callback) { - callback(error, response, body); - }; + retryRequest_Override = util.noop; - utilOverrides.handleResp = function(err, resp, bdy) { - assert.deepEqual(err, error); - assert.deepEqual(resp, response); - assert.deepEqual(bdy, body); + utilOverrides.decorateRequest = function(reqOpts_) { + assert.strictEqual(reqOpts_, reqOpts); done(); }; - util.makeRequest({}, {}, assert.ifError); + util.makeRequest(reqOpts, {}, assert.ifError); }); - describe('request errors', function() { - describe('non-rate limit error', function() { - it('should return error to callback', function(done) { - var nonRateLimitError = new Error('Error.'); - - request_Override = function(reqOpts, callback) { - callback(nonRateLimitError); - }; - - util.makeRequest({}, {}, function(err) { - assert.deepEqual(err, nonRateLimitError); - done(); - }); - }); + describe('stream mode', function() { + it('should pass the default options to retryRequest', function(done) { + retryRequest_Override = testDefaultRetryRequestConfig(done); + util.makeRequest(reqOpts, {}); }); - describe('rate limit errors', function() { - var rateLimitError = new Error('Rate limit error.'); - rateLimitError.code = 500; - - beforeEach(function() { - // Always return a rate limit error. - request_Override = function (reqOpts, callback) { - callback(rateLimitError); - }; - - // Always suggest retrying. - utilOverrides.shouldRetryRequest = function() { - return true; - }; - - // Always return a 0 retry wait. - utilOverrides.getNextRetryWait = function() { - return 0; - }; - }); - - it('should check with shouldRetryRequest', function(done) { - utilOverrides.shouldRetryRequest = function() { - done(); - }; - - util.makeRequest({}, {}, util.noop); - }); + it('should allow turning off retries to retryRequest', function(done) { + retryRequest_Override = testNoRetryRequestConfig(done); + util.makeRequest(reqOpts, noRetryRequestConfig); + }); - it('should default to 3 retries', function(done) { - var attempts = 0; - var expectedAttempts = 4; // the original request + 3 retries + it('should override number of retries to retryRequest', function(done) { + retryRequest_Override = testCustomRetryRequestConfig(done); + util.makeRequest(reqOpts, customRetryRequestConfig); + }); - utilOverrides.handleResp = function(err, resp, body, callback) { - attempts++; - callback(err); - }; + it('should forward the specified events to the stream', function(done) { + var requestStream = new stream.Stream(); + var userStream = new stream.Stream(); - util.makeRequest({}, {}, function(err) { - assert.equal(err, rateLimitError); - assert.equal(attempts, expectedAttempts); - done(); - }); - }); - - it('should allow max retries to be specified', function(done) { - var attempts = 0; - var maxRetries = 5; - var expectedAttempts = maxRetries + 1; // the original req + retryRequest_Override = function() { + return requestStream; + }; - utilOverrides.handleResp = function(err, resp, body, callback) { - attempts++; - callback(err); - }; + streamForward_Override = function(stream_) { + assert.strictEqual(stream_, requestStream); + done(); + return stream_; + }; - util.makeRequest({}, { maxRetries: maxRetries }, function(err) { - assert.equal(err, rateLimitError); - assert.equal(attempts, expectedAttempts); - done(); - }); - }); + util.makeRequest(reqOpts, { stream: userStream }); + }); + }); - it('should not retry reqs if autoRetry is false', function(done) { - var attempts = 0; - var expectedAttempts = 1; // the original req + describe('callback mode', function() { + it('should pass the default options to retryRequest', function(done) { + retryRequest_Override = testDefaultRetryRequestConfig(done); + util.makeRequest(reqOpts, {}, assert.ifError); + }); - utilOverrides.handleResp = function(err, resp, body, callback) { - attempts++; - callback(err); - }; + it('should allow turning off retries to retryRequest', function(done) { + retryRequest_Override = testNoRetryRequestConfig(done); + util.makeRequest(reqOpts, noRetryRequestConfig, assert.ifError); + }); - util.makeRequest({}, { autoRetry: false }, function(err) { - assert.equal(err, rateLimitError); - assert.equal(attempts, expectedAttempts); - done(); - }); - }); + it('should override number of retries to retryRequest', function(done) { + retryRequest_Override = testCustomRetryRequestConfig(done); + util.makeRequest(reqOpts, customRetryRequestConfig, assert.ifError); }); - }); - describe('request success', function() { - it('should let handleResp handle response', function(done) { - utilOverrides.handleResp = function() { - done(); + it('should let handleResp handle the response', function(done) { + var error = new Error('Error.'); + var response = { a: 'b', c: 'd' }; + var body = response.a; + + request_Override = function(rOpts, callback) { + callback(error, response, body); }; - request_Override = function(reqOpts, callback) { - callback(); + utilOverrides.handleResp = function(err, resp, body_) { + assert.strictEqual(err, error); + assert.strictEqual(resp, response); + assert.strictEqual(body_, body); + done(); }; util.makeRequest({}, {}, assert.ifError); diff --git a/test/storage/file.js b/test/storage/file.js index e38b8a6bc97..fb3a7cfabf5 100644 --- a/test/storage/file.js +++ b/test/storage/file.js @@ -119,7 +119,11 @@ describe('File', function() { beforeEach(function() { var options = { makeAuthorizedRequest_: function(req, callback) { - (callback.onAuthorized || callback)(null, req); + if (callback) { + (callback.onAuthorized || callback)(null, req); + } else { + return (request_Override || request_Cached)(req); + } } }; bucket = new Bucket(options, 'bucket-name'); @@ -463,7 +467,10 @@ describe('File', function() { versionedFile.bucket.storage.makeAuthorizedRequest_ = function(reqOpts) { assert.equal(reqOpts.qs.generation, 1); - done(); + setImmediate(function() { + done(); + }); + return new FakeDuplexify(); }; versionedFile.createReadStream(); @@ -493,7 +500,10 @@ describe('File', function() { file.bucket.storage.makeAuthorizedRequest_ = function(opts) { assert.equal(opts.uri, expectedPath); - done(); + setImmediate(function() { + done(); + }); + return new FakeDuplexify(); }; file.createReadStream(); @@ -503,10 +513,14 @@ describe('File', function() { var ERROR = new Error('Error.'); beforeEach(function() { - file.bucket.storage.makeAuthorizedRequest_ = function(opt, callback) { + file.bucket.storage.makeAuthorizedRequest_ = function(opts) { + var stream = (request_Override || request)(opts); + setImmediate(function() { - (callback.onAuthorized || callback)(ERROR); + stream.emit('error', ERROR); }); + + return stream; }; }); @@ -532,10 +546,13 @@ describe('File', function() { request_Override = getFakeRequest(); - file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) { - (callback.onAuthorized || callback)(null, fakeRequest); - assert.deepEqual(request_Override.getRequestOptions(), fakeRequest); - done(); + file.bucket.storage.makeAuthorizedRequest_ = function() { + setImmediate(function() { + assert.deepEqual(request_Override.getRequestOptions(), fakeRequest); + done(); + }); + + return request_Override(fakeRequest); }; file.createReadStream(); @@ -561,16 +578,6 @@ describe('File', function() { request_Override = getFakeFailedRequest(ERROR); }); - it('should end the request stream', function(done) { - var readStream = file.createReadStream(); - - readStream.once('error', function() { - assert(request_Override.wasRequestAborted()); - assert(request_Override.wasRequestDestroyed()); - done(); - }); - }); - it('should emit the error', function(done) { file.createReadStream() .once('error', function(err) { @@ -586,54 +593,6 @@ describe('File', function() { }); }); - describe('response', function() { - it('should use util.handleResp', function(done) { - var response = { - body: { - a: 'b', - c: 'd', - } - }; - - request_Override = getFakeSuccessfulRequest('body', response); - - handleResp_Override = function(err, resp, body) { - assert.strictEqual(err, null); - assert.deepEqual(resp, response); - assert.deepEqual(body, response.body); - done(); - }; - - file.createReadStream(); - }); - - describe('errors', function() { - var ERROR = new Error('Error.'); - - beforeEach(function() { - request_Override = getFakeSuccessfulRequest('body', { body: null }); - - handleResp_Override = function(err, resp, body, callback) { - callback(ERROR); - }; - }); - - it('should emit the error', function(done) { - var readStream = file.createReadStream(); - - readStream.once('error', function(err) { - assert.deepEqual(err, ERROR); - done(); - }); - }); - - it('should destroy the through stream', function(done) { - var readStream = file.createReadStream(); - readStream.destroy = done; - }); - }); - }); - describe('validation', function() { var data = 'test'; @@ -656,7 +615,11 @@ describe('File', function() { file.metadata.mediaLink = 'http://uri'; file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) { - (callback.onAuthorized || callback)(null, {}); + if (callback) { + (callback.onAuthorized || callback)(null, {}); + } else { + return (request_Override || request_Cached)(opts); + } }; });