Skip to content

Commit

Permalink
core (stream-router): allow setting maxApiCalls (#1332)
Browse files Browse the repository at this point in the history
* core (stream-router): allow setting maxApiCalls

* move logic to iterator

* iterator -> limiter

* re-arrange stream-events use

* tests.

* lint

* re-sort tests so overrides dont mess with docs

* clean up maxApiCalls property

* add system test for bigquery

* docs

* explain interceptors quirk
  • Loading branch information
stephenplusplus authored and callmehiphop committed Jun 2, 2016
1 parent 12d9cdd commit 4184a1a
Show file tree
Hide file tree
Showing 30 changed files with 679 additions and 269 deletions.
3 changes: 3 additions & 0 deletions lib/bigquery/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ Dataset.prototype.delete = function(options, callback) {
* @resource [Tables: list API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/tables/list}
*
* @param {object=} query - Configuration object.
* @param {boolean} query.autoPaginate - Have pagination handled automatically.
* Default: true.
* @param {number} query.maxApiCalls - Maximum number of API calls to make.
* @param {number} query.maxResults - Maximum number of results to return.
* @param {string} query.pageToken - Token returned from a previous call, to
* request the next page of results.
Expand Down
3 changes: 3 additions & 0 deletions lib/bigquery/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ BigQuery.prototype.dataset = function(id) {
* @param {boolean} query.all - List all datasets, including hidden ones.
* @param {boolean} query.autoPaginate - Have pagination handled automatically.
* Default: true.
* @param {number} query.maxApiCalls - Maximum number of API calls to make.
* @param {number} query.maxResults - Maximum number of results to return.
* @param {string} query.pageToken - Token returned from a previous call, to
* request the next page of results.
Expand Down Expand Up @@ -279,6 +280,7 @@ BigQuery.prototype.getDatasets = function(query, callback) {
* project.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number=} options.maxResults - Maximum number of results to return.
* @param {string=} options.pageToken - Token returned from a previous call, to
* request the next page of results.
Expand Down Expand Up @@ -407,6 +409,7 @@ BigQuery.prototype.job = function(id) {
* @param {string|object} options - A string SQL query or configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of results to read.
* @param {string} options.query - A query string, following the BigQuery query
* syntax, of the query to execute.
Expand Down
1 change: 1 addition & 0 deletions lib/bigquery/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ Job.prototype.cancel = function(callback) {
* @param {object=} options - Configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of results to read.
* @param {string} options.pageToken - Page token, returned by a previous call,
* to request the next page of results. Note: This is automatically added to
Expand Down
1 change: 1 addition & 0 deletions lib/bigquery/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ Table.prototype.export = function(destination, options, callback) {
* @param {object=} options - The configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of results to return.
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request
Expand Down
46 changes: 35 additions & 11 deletions lib/common/stream-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@

var arrify = require('arrify');
var concat = require('concat-stream');
var split = require('split-array-stream');
var extend = require('extend');
var is = require('is');
var streamEvents = require('stream-events');
var through = require('through2');
var split = require('split-array-stream');

/**
* @type {module:common/util}
* @private
*/
var util = require('./util.js');

/*! Developer Documentation
*
Expand Down Expand Up @@ -72,9 +77,10 @@ streamRouter.extend = function(Class, methodNames) {
*/
streamRouter.parseArguments_ = function(args) {
var query;
var callback;
var maxResults = -1;
var autoPaginate = true;
var maxApiCalls = -1;
var maxResults = -1;
var callback;

var firstArgument = args[0];
var lastArgument = args[args.length - 1];
Expand All @@ -90,6 +96,8 @@ streamRouter.parseArguments_ = function(args) {
}

if (is.object(query)) {
query = extend(true, {}, query);

// Check if the user only asked for a certain amount of results.
if (is.number(query.maxResults)) {
// `maxResults` is used API-wide.
Expand All @@ -99,6 +107,11 @@ streamRouter.parseArguments_ = function(args) {
maxResults = query.pageSize;
}

if (is.number(query.maxApiCalls)) {
maxApiCalls = query.maxApiCalls;
delete query.maxApiCalls;
}

if (callback &&
(maxResults !== -1 || // The user specified a limit.
query.autoPaginate === false)) {
Expand All @@ -108,9 +121,10 @@ streamRouter.parseArguments_ = function(args) {

return {
query: query || {},
callback: callback,
autoPaginate: autoPaginate,
maxApiCalls: maxApiCalls,
maxResults: maxResults,
autoPaginate: autoPaginate
callback: callback
};
};

Expand All @@ -126,6 +140,7 @@ streamRouter.parseArguments_ = function(args) {
* string in some places.
* @param {function=} parsedArguments.callback - Callback function.
* @param {boolean} parsedArguments.autoPaginate - Auto-pagination enabled.
* @param {boolean} parsedArguments.maxApiCalls - Maximum API calls to make.
* @param {number} parsedArguments.maxResults - Maximum results to return.
* @param {function} originalMethod - The cached method that accepts a callback
* and returns `nextQuery` to receive more results.
Expand Down Expand Up @@ -163,6 +178,7 @@ streamRouter.router_ = function(parsedArguments, originalMethod) {
* string in some places.
* @param {function=} parsedArguments.callback - Callback function.
* @param {boolean} parsedArguments.autoPaginate - Auto-pagination enabled.
* @param {boolean} parsedArguments.maxApiCalls - Maximum API calls to make.
* @param {number} parsedArguments.maxResults - Maximum results to return.
* @param {function} originalMethod - The cached method that accepts a callback
* and returns `nextQuery` to receive more results.
Expand All @@ -172,12 +188,20 @@ streamRouter.runAsStream_ = function(parsedArguments, originalMethod) {
var query = parsedArguments.query;
var resultsToSend = parsedArguments.maxResults;

var stream = streamEvents(through.obj());
var limiter = util.createLimiter(makeRequest, {
maxApiCalls: parsedArguments.maxApiCalls
});

var stream = limiter.stream;

stream.once('reading', function() {
originalMethod(query, onResultSet);
makeRequest(query);
});

function makeRequest(query) {
originalMethod(query, onResultSet);
}

function onResultSet(err, results, nextQuery) {
if (err) {
stream.destroy(err);
Expand All @@ -196,15 +220,15 @@ streamRouter.runAsStream_ = function(parsedArguments, originalMethod) {
}

if (nextQuery && resultsToSend !== 0) {
originalMethod(nextQuery, onResultSet);
limiter.makeRequest(nextQuery);
return;
}

stream.push(null);
});
}

return stream;
return limiter.stream;
};

module.exports = streamRouter;
41 changes: 41 additions & 0 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var request = require('request').defaults({
}
});
var retryRequest = require('retry-request');
var streamEvents = require('stream-events');
var through = require('through2');
var uniq = require('array-uniq');

Expand Down Expand Up @@ -545,3 +546,43 @@ function normalizeArguments(globalContext, localConfig, options) {
}

util.normalizeArguments = normalizeArguments;

/**
* Limit requests according to a `maxApiCalls` limit.
*
* @param {function} makeRequestFn - The function that will be called.
* @param {object=} options - Configuration object.
* @param {number} options.maxApiCalls - The maximum number of API calls to
* make.
*/
function createLimiter(makeRequestFn, options) {
var stream = streamEvents(through.obj());

var requestsMade = 0;
var requestsToMake = -1;

options = options || {};

if (is.number(options.maxApiCalls)) {
requestsToMake = options.maxApiCalls;
}

return {
makeRequest: function() {
requestsMade++;

if (requestsToMake >= 0 && requestsMade > requestsToMake) {
stream.push(null);
return;
}

makeRequestFn.apply(null, arguments);

return stream;
},

stream: stream
};
}

util.createLimiter = createLimiter;
14 changes: 14 additions & 0 deletions lib/compute/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ Compute.prototype.firewall = function(name) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of addresses to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -716,6 +717,7 @@ Compute.prototype.getAddresses = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of addresses to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -834,6 +836,7 @@ Compute.prototype.getAutoscalers = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of disks to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -947,6 +950,7 @@ Compute.prototype.getDisks = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of instance groups to
* return.
* @param {string} options.pageToken - A previously-returned page token
Expand Down Expand Up @@ -1062,6 +1066,7 @@ Compute.prototype.getInstanceGroups = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of firewalls to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -1170,6 +1175,7 @@ Compute.prototype.getFirewalls = function(options, callback) {
* can be a regular expression.
* @param {boolean} options.https - List only HTTPs health checks. Default:
* `false`.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of networks to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -1280,6 +1286,7 @@ Compute.prototype.getHealthChecks = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of networks to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -1385,6 +1392,7 @@ Compute.prototype.getNetworks = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of operations to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -1490,6 +1498,7 @@ Compute.prototype.getOperations = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of instances to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -1592,6 +1601,7 @@ Compute.prototype.getRegions = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of rules to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -1696,6 +1706,7 @@ Compute.prototype.getRules = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of snapshots to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -1801,6 +1812,7 @@ Compute.prototype.getServices = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of snapshots to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -1906,6 +1918,7 @@ Compute.prototype.getSnapshots = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of instances to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -2019,6 +2032,7 @@ Compute.prototype.getVMs = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of instances to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down
2 changes: 2 additions & 0 deletions lib/compute/instance-group.js
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ InstanceGroup.prototype.delete = function(callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of VMs to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
* @param {boolean} options.running - Only return instances which are running.
Expand Down
1 change: 1 addition & 0 deletions lib/compute/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ Network.prototype.firewall = function(name) {
* @param {object=} options - Firewall search options.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of firewalls to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down
3 changes: 3 additions & 0 deletions lib/compute/region.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ Region.prototype.createRule = function(name, config, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of addresses to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -387,6 +388,7 @@ Region.prototype.getAddresses = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of operations to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down Expand Up @@ -491,6 +493,7 @@ Region.prototype.getOperations = function(options, callback) {
* (not equal)
* - **`filterString`**: the string to filter to. For string fields, this
* can be a regular expression.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of rules to return.
* @param {string} options.pageToken - A previously-returned page token
* representing part of the larger set of results to view.
Expand Down
Loading

0 comments on commit 4184a1a

Please sign in to comment.