From 493e9404d71c496ea83a6badf2ffd4c22a02220c Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Thu, 25 Jun 2015 14:55:06 -0400 Subject: [PATCH 1/9] streamrouter: implement across bigquery --- lib/bigquery/index.js | 172 +++++++++++++++++----------------------- lib/bigquery/job.js | 2 +- lib/bigquery/table.js | 53 ++++++------- system-test/bigquery.js | 49 ++++++++++-- test/bigquery/index.js | 161 ++++++++++--------------------------- test/bigquery/table.js | 107 ++++++------------------- 6 files changed, 208 insertions(+), 336 deletions(-) diff --git a/lib/bigquery/index.js b/lib/bigquery/index.js index ac4248971b1..9df378425de 100644 --- a/lib/bigquery/index.js +++ b/lib/bigquery/index.js @@ -21,8 +21,6 @@ 'use strict'; var extend = require('extend'); -var streamEvents = require('stream-events'); -var through = require('through2'); /** * @type {module:bigquery/dataset} @@ -36,6 +34,12 @@ var Dataset = require('./dataset.js'); */ var Job = require('./job.js'); +/** + * @type {module:common/streamrouter} + * @private + */ +var streamRouter = require('../common/stream-router.js'); + /** * @type {module:bigquery/table} * @private @@ -167,6 +171,18 @@ BigQuery.prototype.dataset = function(id) { * bigquery.getDatasets(function(err, datasets, nextQuery, apiResponse) { * // If `nextQuery` is non-null, there are more results to fetch. * }); + * + * //- + * // Get the datasets from your project as a readable object stream. + * //- + * bigquery.getDatasets() + * .on('error', console.error) + * .on('data', function(dataset) { + * // dataset is a Dataset object. + * }) + * .on('end', function() { + * // All datasets retrieved. + * }); */ BigQuery.prototype.getDatasets = function(query, callback) { var that = this; @@ -222,6 +238,18 @@ BigQuery.prototype.getDatasets = function(query, callback) { * bigquery.getJobs(function(err, jobs, nextQuery, apiResponse) { * // If `nextQuery` is non-null, there are more results to fetch. * }); + * + * //- + * // Get the jobs from your project as a readable object stream. + * //- + * bigquery.getJobs() + * .on('error', console.error) + * .on('data', function(job) { + * // job is a Job object. + * }) + * .on('end', function() { + * // All jobs retrieved. + * }); */ BigQuery.prototype.getJobs = function(options, callback) { var that = this; @@ -270,31 +298,6 @@ BigQuery.prototype.job = function(id) { return new Job(this, id); }; -/*! Developer Documentation - * - * The `query` method is dual-purpose, like the use cases for a query. - * Sometimes, a user will want to fetch results from their table in a serial - * manner (get results -> more results exist? -> get more results, repeat.) -- - * other times, a user may want to wave their hands at making repeated calls to - * get all of the rows, instead using a stream. - * - * A couple different libraries are used to cover the stream case: - * - * var stream = streamEvents(through2.obj()); - * - * - streamEvents - https://github.com/stephenplusplus/stream-events - * This library enables us to wait until our stream is being asked for - * data, before making any API calls. It is possible a user will get a - * stream, then not end up running it - or, it will be run later, at a - * time when the token returned from the API call could have expired. - * Using this library ensures we wait until the last possible chance to - * get that token. - * - * - through2 - https://github.com/rvagg/through2 - * This is a popular library for how simple it makes dealing with the - * complicated Node.js Streams API. We're creating an object stream, as - * the data we are receiving from the API are rows of JSON data. - */ /** * Run a query scoped to your project. * @@ -340,18 +343,17 @@ BigQuery.prototype.job = function(id) { * // You can also use the `query` method as a readable object stream by * // omitting the callback. * //- - * var through2 = require('through2'); - * * bigquery.query(query) - * .pipe(through2.obj(function(row, enc, next) { - * this.push(row.url += '?trackingCode=AZ19b\n'); - * next(); - * })) - * .pipe(process.stdout); + * .on('error', console.error) + * .on('data', function(row) { + * // row is a result from your query. + * }) + * .on('end', function() { + * // All rows retrieved. + * }); */ BigQuery.prototype.query = function(options, callback) { var that = this; - var stream; if (util.is(options, 'string')) { options = { @@ -366,79 +368,42 @@ BigQuery.prototype.query = function(options, callback) { var requestQuery = extend({}, options); delete requestQuery.job; - if (!util.is(callback, 'function')) { - stream = streamEvents(through.obj()); - stream.once('reading', runQuery); - return stream; + if (job) { + // Get results of the query. + var path = '/queries/' + job.id; + that.makeReq_('GET', path, requestQuery, null, responseHandler); } else { - callback = callback || util.noop; - runQuery(); + // Create a job. + that.makeReq_('POST', '/queries', null, options, responseHandler); } - function runQuery() { - if (job) { - // Get results of the query. - var path = '/queries/' + job.id; - that.makeReq_('GET', path, requestQuery, null, responseHandler); - } else { - // Create a job. - that.makeReq_('POST', '/queries', null, options, responseHandler); + function responseHandler(err, resp) { + if (err) { + callback(err, null, null, resp); + return; } - function responseHandler(err, resp) { - if (err) { - onComplete(err, null, null, resp); - return; - } - - var rows = []; - if (resp.schema && resp.rows) { - rows = Table.mergeSchemaWithRows_(resp.schema, resp.rows); - } - - var nextQuery = null; - if (resp.jobComplete === false) { - // Query is still running. - nextQuery = extend({}, options); - } else if (resp.pageToken) { - // More results exist. - nextQuery = extend({}, options, { - pageToken: resp.pageToken - }); - } - if (nextQuery && !nextQuery.job && resp.jobReference.jobId) { - // Create a prepared Job to continue the query. - nextQuery.job = that.job(resp.jobReference.jobId); - } - - onComplete(null, rows, nextQuery, resp); + var rows = []; + if (resp.schema && resp.rows) { + rows = Table.mergeSchemaWithRows_(resp.schema, resp.rows); } - function onComplete(err, rows, nextQuery, resp) { - if (err) { - if (stream) { - stream.emit('error', err); - stream.end(); - } else { - callback(err, null, null, resp); - } - return; - } - - if (stream) { - rows.forEach(function(row) { - stream.push(row); - }); - - if (nextQuery) { - that.query(nextQuery, onComplete); - } else { - stream.end(); - } - } else { - callback(null, rows, nextQuery, resp); - } + var nextQuery = null; + if (resp.jobComplete === false) { + // Query is still running. + nextQuery = extend({}, options); + } else if (resp.pageToken) { + // More results exist. + nextQuery = extend({}, options, { + pageToken: resp.pageToken + }); + } + if (nextQuery && !nextQuery.job && resp.jobReference.jobId) { + // Create a prepared Job to continue the query. + nextQuery.job = that.job(resp.jobReference.jobId); } + + callback(null, rows, nextQuery, resp); } }; @@ -564,4 +529,11 @@ BigQuery.prototype.makeReq_ = function(method, path, query, body, callback) { this.makeAuthorizedRequest_(reqOpts, callback); }; +/*! Developer Documentation + * + * These methods can be used with either a callback or as a readable object + * stream. `streamRouter` is used to add this dual behavior. + */ +streamRouter.extend(BigQuery, ['getDatasets', 'getJobs', 'query']); + module.exports = BigQuery; diff --git a/lib/bigquery/job.js b/lib/bigquery/job.js index bab5465c5fd..87c1e481e10 100644 --- a/lib/bigquery/job.js +++ b/lib/bigquery/job.js @@ -120,7 +120,7 @@ Job.prototype.getMetadata = function(callback) { * job.getQueryResults(options, function(err, rows, nextQuery, apiResponse) {}); * * //- - * // Consume the results from the query as a readable stream. + * // Consume the results from the query as a readable object stream. * //- * var through2 = require('through2'); * var fs = require('fs'); diff --git a/lib/bigquery/table.js b/lib/bigquery/table.js index 6b031af55ff..814cc0bdacb 100644 --- a/lib/bigquery/table.js +++ b/lib/bigquery/table.js @@ -26,7 +26,6 @@ var extend = require('extend'); var fs = require('fs'); var path = require('path'); var streamEvents = require('stream-events'); -var through = require('through2'); /** * @type {module:storage/file} @@ -34,6 +33,12 @@ var through = require('through2'); */ var File = require('../storage/file'); +/** + * @type {module:common/streamrouter} + * @private + */ +var streamRouter = require('../common/stream-router.js'); + /** * @type {module:common/util} * @private @@ -191,7 +196,8 @@ Table.prototype.copy = function(destination, metadata, callback) { }; /** - * Create a readable stream of the rows of data in your table. + * Create a readable stream of the rows of data in your table. This method is + * simply a wrapper around {module:bigquery/table#getRows}. * * @return {ReadStream} * @@ -206,31 +212,7 @@ Table.prototype.copy = function(destination, metadata, callback) { * .pipe(fs.createWriteStream('./test/testdata/testfile.json')); */ Table.prototype.createReadStream = function() { - var that = this; - - var stream = streamEvents(through.obj()); - stream.once('reading', function() { - that.getRows(handleResponse); - }); - return stream; - - function handleResponse(err, rows, nextQuery) { - if (err) { - stream.emit('error', err); - stream.end(); - return; - } - - rows.forEach(function(row) { - stream.push(row); - }); - - if (nextQuery) { - that.getRows(nextQuery, handleResponse); - } else { - stream.end(); - } - } + return this.getRows(); }; /** @@ -515,6 +497,16 @@ Table.prototype.getMetadata = function(callback) { * table.getRows(nextQuery, function(err, rows, nextQuery, apiResponse) {}); * } * }); + * + * //- + * // Get the rows as a readable object stream. + * //- + * table.getRows(options) + * .on('error', console.error) + * .on('data', function(row) {}) + * .on('end', function() { + * // All rows have been retrieved. + * }); */ Table.prototype.getRows = function(options, callback) { var that = this; @@ -859,4 +851,11 @@ Table.prototype.makeReq_ = function(method, path, query, body, callback) { this.dataset.makeReq_(method, path, query, body, callback); }; +/*! Developer Documentation + * + * These methods can be used with either a callback or as a readable object + * stream. `streamRouter` is used to add this dual behavior. + */ +streamRouter.extend(Table, 'getRows'); + module.exports = Table; diff --git a/system-test/bigquery.js b/system-test/bigquery.js index 9b8188dbe5b..7e2a4e2fcf4 100644 --- a/system-test/bigquery.js +++ b/system-test/bigquery.js @@ -138,6 +138,20 @@ describe('BigQuery', function() { }); }); + it('should list datasets as a stream', function(done) { + var datasetEmitted = false; + + bigquery.getDatasets() + .on('error', done) + .on('data', function(dataset) { + datasetEmitted = dataset instanceof Dataset; + }) + .on('end', function() { + assert.strictEqual(datasetEmitted, true); + done(); + }); + }); + it('should run a query job, then get results', function(done) { bigquery.startQuery(query, function(err, job) { assert.ifError(err); @@ -160,7 +174,7 @@ describe('BigQuery', function() { assert.equal(typeof row.url, 'string'); }) .on('error', done) - .on('finish', function() { + .on('end', function() { assert.equal(rowsEmitted, 100); done(); }); @@ -186,6 +200,20 @@ describe('BigQuery', function() { }); }); + it('should list jobs as a stream', function(done) { + var jobEmitted = false; + + bigquery.getJobs() + .on('error', done) + .on('data', function(job) { + jobEmitted = job instanceof Job; + }) + .on('end', function() { + assert.strictEqual(jobEmitted, true); + done(); + }); + }); + describe('BigQuery/Dataset', function() { it('should set & get metadata', function(done) { dataset.setMetadata({ @@ -216,12 +244,19 @@ describe('BigQuery', function() { }); }); - it('should insert rows', function(done) { - table.insert([ - { name: 'silvano', breed: 'the cat kind', id: 1, dob: Date.now() }, - { name: 'ryan', breed: 'golden retriever?', id: 2, dob: Date.now() }, - { name: 'stephen', breed: 'idkanycatbreeds', id: 3, dob: Date.now() } - ], done); + it('should get the rows in a table', function(done) { + table.getRows(function(err, rows) { + assert.ifError(err); + assert(Array.isArray(rows)); + done(); + }); + }); + + it('should get the rows in a table via stream', function(done) { + table.getRows() + .on('error', done) + .on('data', function() {}) + .on('end', done); }); it('should insert rows via stream', function(done) { diff --git a/test/bigquery/index.js b/test/bigquery/index.js index ee178adc987..dbc1c0f4133 100644 --- a/test/bigquery/index.js +++ b/test/bigquery/index.js @@ -14,15 +14,12 @@ * limitations under the License. */ -/*global describe, it, beforeEach, before, after */ - 'use strict'; var assert = require('assert'); var mockery = require('mockery'); -var Stream = require('stream').Stream; -var Table = require('../../lib/bigquery/table'); -var util = require('../../lib/common/util'); +var Table = require('../../lib/bigquery/table.js'); +var util = require('../../lib/common/util.js'); function FakeTable(a, b) { Table.call(this, a, b); @@ -35,11 +32,19 @@ FakeTable.mergeSchemaWithRows_ = function() { .apply(null, args); }; -function fakeGsa() { - return function(req, callback) { - callback(null, req); - }; -} +var extended = false; +var fakeStreamRouter = { + extend: function(Class, methods) { + if (Class.name !== 'BigQuery') { + return; + } + + methods = util.arrayize(methods); + assert.equal(Class.name, 'BigQuery'); + assert.deepEqual(methods, ['getDatasets', 'getJobs', 'query']); + extended = true; + } +}; describe('BigQuery', function() { var JOB_ID = 'JOB_ID'; @@ -50,7 +55,7 @@ describe('BigQuery', function() { before(function() { mockery.registerMock('./table.js', FakeTable); - mockery.registerMock('google-service-account', fakeGsa); + mockery.registerMock('../common/stream-router.js', fakeStreamRouter); mockery.enable({ useCleanCache: true, warnOnUnregistered: false @@ -68,6 +73,10 @@ describe('BigQuery', function() { }); describe('instantiation', function() { + it('should extend the correct methods', function() { + assert(extended); // See `fakeStreamRouter.extend` + }); + it('should throw if a projectId is not specified', function() { assert.throws(function() { new BigQuery(); @@ -395,19 +404,6 @@ describe('BigQuery', function() { bq.query(options, assert.ifError); }); - it('should be a stream if a callback is omitted', function() { - assert(bq.query() instanceof Stream); - }); - - it('should run the query after being read from', function(done) { - bq.makeReq_ = function() { - done(); - }; - - var stream = bq.query(); - stream.emit('reading'); - }); - describe('job is incomplete', function() { var options = {}; @@ -503,113 +499,38 @@ describe('BigQuery', function() { bq.query({}, assert.ifError); }); - describe('errors', function() { + it('should pass errors to the callback', function(done) { var error = new Error('Error.'); - beforeEach(function() { - bq.makeReq_ = function(method, path, query, body, callback) { - callback(error); - }; - }); - - describe('serial', function() { - it('should pass errors to the callback', function(done) { - bq.query({}, function(err) { - assert.equal(err, error); - done(); - }); - }); - }); - - describe('streams', function() { - it('should emit errors', function(done) { - bq.query() - .once('error', function(err) { - assert.equal(err, error); - done(); - }) - .emit('reading'); - }); + bq.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; - it('should end the stream', function(done) { - bq.query() - .once('error', util.noop) - .once('finish', done) - .emit('reading'); - }); + bq.query({}, function(err) { + assert.equal(err, error); + done(); }); }); - describe('results', function() { + it('should return rows to the callback', function(done) { var ROWS = [{ a: 'b' }, { c: 'd' }]; - beforeEach(function() { - bq.makeReq_ = function(method, path, query, body, callback) { - callback(null, { - jobReference: { jobId: JOB_ID }, - rows: [], - schema: {} - }); - }; - - mergeSchemaWithRows_Override = function() { - mergeSchemaWithRows_Override = null; - return ROWS; - }; - }); - - describe('serial', function() { - it('should return rows to callback', function(done) { - bq.query({}, function(err, rows) { - assert.deepEqual(rows, ROWS); - done(); - }); - }); - }); - - describe('streams', function() { - it('should emit rows to stream', function(done) { - var rowsEmitted = 0; - bq.query() - .on('data', function(row) { - assert.deepEqual(row, ROWS[rowsEmitted]); - rowsEmitted++; - }) - .on('end', function() { - assert.equal(rowsEmitted, ROWS.length); - done(); - }); + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { + jobReference: { jobId: JOB_ID }, + rows: [], + schema: {} }); + }; - it('should call .query() with nextQuery automatically', function(done) { - var queryCalled = 0; - var pageToken = 'token'; - - bq.makeReq_ = function(method, path, query, body, callback) { - callback(null, { - jobReference: { jobId: JOB_ID }, - pageToken: pageToken - }); - }; - - var query = bq.query; - bq.query = function(options) { - queryCalled++; - - if (queryCalled === 1) { - return query.apply(bq, [].slice.call(arguments)); - } else { - assert.deepEqual(options.pageToken, pageToken); - done(); - } - }; - - bq.query().emit('reading'); - }); + mergeSchemaWithRows_Override = function() { + mergeSchemaWithRows_Override = null; + return ROWS; + }; - it('should end the stream if there is no nextQuery', function(done) { - bq.query().on('finish', done).emit('reading'); - }); + bq.query({}, function(err, rows) { + assert.deepEqual(rows, ROWS); + done(); }); }); }); diff --git a/test/bigquery/table.js b/test/bigquery/table.js index 94e3bcadae2..e225abc1d96 100644 --- a/test/bigquery/table.js +++ b/test/bigquery/table.js @@ -38,6 +38,20 @@ var fakeUtil = extend({}, util, { } }); +var extended = false; +var fakeStreamRouter = { + extend: function(Class, methods) { + if (Class.name !== 'Table') { + return; + } + + methods = util.arrayize(methods); + assert.equal(Class.name, 'Table'); + assert.deepEqual(methods, ['getRows']); + extended = true; + } +}; + describe('BigQuery/Table', function() { var DATASET = { id: 'dataset-id', @@ -66,6 +80,7 @@ describe('BigQuery/Table', function() { before(function() { mockery.registerMock('../storage/file', FakeFile); + mockery.registerMock('../common/stream-router.js', fakeStreamRouter); mockery.registerMock('../common/util', fakeUtil); mockery.enable({ useCleanCache: true, @@ -84,6 +99,12 @@ describe('BigQuery/Table', function() { table = new Table(DATASET, TABLE_ID); }); + describe('instantiation', function() { + it('should extend the correct methods', function() { + assert(extended); // See `fakeStreamRouter.extend` + }); + }); + describe('createSchemaFromString_', function() { it('should create a schema object from a string', function() { assert.deepEqual( @@ -234,91 +255,15 @@ describe('BigQuery/Table', function() { }); describe('createReadStream', function() { - it('should return a stream', function() { - assert(table.createReadStream() instanceof stream.Stream); - }); + it('should return table.getRows()', function() { + var uniqueReturnValue = 'abc123'; - it('should call getRows() when asked for data', function(done) { table.getRows = function() { - done(); - }; - table.createReadStream().emit('reading'); - }); - - it('should emit rows', function(done) { - var rows = [{ a: 'b' }, { c: 'd' }]; - var rowsEmitted = 0; - - table.getRows = function(handler) { - handler(null, rows); + assert.equal(arguments.length, 0); + return uniqueReturnValue; }; - table.createReadStream() - .on('data', function(row) { - assert.deepEqual(row, rows[rowsEmitted]); - rowsEmitted++; - }) - .on('end', function() { - assert.equal(rowsEmitted, rows.length); - done(); - }); - }); - - it('should call getRows() if nextQuery exists', function(done) { - var called = 0; - - var nextQuery = { a: 'b', c: 'd' }; - var responseHandler; - - table.getRows = function(query, handler) { - responseHandler = responseHandler || handler || query; - - called++; - - if (called === 2) { - assert.deepEqual(query, nextQuery); - assert.equal(responseHandler, handler); - done(); - } else { - responseHandler(null, [], nextQuery); - } - }; - - table.createReadStream().emit('reading'); - }); - - it('should end the stream when nextQuery is not present', function(done) { - table.getRows = function(handler) { - handler(null, []); - }; - - table.createReadStream().on('finish', done).emit('reading'); - }); - - describe('errors', function() { - var error = new Error('Error.'); - - beforeEach(function() { - table.getRows = function(handler) { - handler(error); - }; - }); - - it('should emit errors', function(done) { - table.createReadStream() - .once('error', function(err) { - assert.equal(err, error); - done(); - }) - .emit('reading'); - }); - - it('should end the stream', function(done) { - table.createReadStream() - .once('error', util.noop) - .once('finish', done) - .emit('reading'); - }); + assert.equal(table.createReadStream(), uniqueReturnValue); }); }); From 24b8f0b11cd11c64cf9b1ba96c366b69061264ae Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Thu, 25 Jun 2015 16:38:53 -0400 Subject: [PATCH 2/9] streamrouter: support replacing last argument when undefined --- lib/common/stream-router.js | 13 ++++++++++--- test/common/stream-router.js | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/lib/common/stream-router.js b/lib/common/stream-router.js index a2ae6d15c74..0885098e99f 100644 --- a/lib/common/stream-router.js +++ b/lib/common/stream-router.js @@ -81,8 +81,8 @@ streamRouter.extend = function(Class, methodNames) { */ streamRouter.router_ = function(args, originalMethod) { args = util.toArray(args); - var callback = args[args.length - 1]; - var isStreamMode = !util.is(callback, 'function'); + var lastArgument = args[args.length - 1]; + var isStreamMode = !util.is(lastArgument, 'function'); if (!isStreamMode) { originalMethod.apply(null, args); @@ -128,7 +128,14 @@ streamRouter.router_ = function(args, originalMethod) { } stream.once('reading', function() { - originalMethod.apply(null, args.concat(onResultSet)); + if (util.is(lastArgument, 'undefined')) { + // Replace it with onResultSet. + args.splice(args.length - 1, 1, onResultSet); + } else { + args = args.concat(onResultSet); + } + + originalMethod.apply(null, args); }); return stream; diff --git a/test/common/stream-router.js b/test/common/stream-router.js index c4de9f9ff0e..bcf9b9d8ecd 100644 --- a/test/common/stream-router.js +++ b/test/common/stream-router.js @@ -139,6 +139,9 @@ describe('streamRouter', function() { assert.strictEqual(args[index], arg); }); + // The callback should have been appended to the original arguments. + assert.strictEqual(args.length, ARGS_WITHOUT_CALLBACK.length + 1); + done(); } @@ -146,6 +149,21 @@ describe('streamRouter', function() { rs.on('data', util.noop); // Trigger the underlying `_read` event. }); + it('should replace last argument if it is undefined', function(done) { + var argsWithHangingUndefined = ARGS_WITHOUT_CALLBACK.concat(undefined); + + function originalMethod() { + // If the last argument was replaced, the arguments array length will + // not have increased. + assert.strictEqual(arguments.length, argsWithHangingUndefined.length); + + done(); + } + + var rs = streamRouter.router_(argsWithHangingUndefined, originalMethod); + rs.on('data', util.noop); // Trigger the underlying `_read` event. + }); + it('should emit an error if one occurs', function(done) { var error = new Error('Error.'); From 021ecbcfa61cda650cd38f8c2e5b0848d358ea6d Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Thu, 25 Jun 2015 16:40:46 -0400 Subject: [PATCH 3/9] streamrouter: implement across pubsub --- lib/pubsub/index.js | 39 +++++++++++++++++++++++++++++++++++- lib/pubsub/topic.js | 16 +++++++++++++-- system-test/pubsub.js | 46 +++++++++++++++++++++++++++++++++++++++++++ test/pubsub/index.js | 20 +++++++++++++++++++ 4 files changed, 118 insertions(+), 3 deletions(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 727c473da01..5c5ac1026f5 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -26,6 +26,12 @@ */ var Subscription = require('./subscription.js'); +/** + * @type {module:common/streamrouter} + * @private + */ +var streamRouter = require('../common/stream-router.js'); + /** * @type {module:pubsub/topic} * @private @@ -121,6 +127,18 @@ function PubSub(options) { * pubsub.getTopics({ * pageSize: 3 * }, function(err, topics, nextQuery, apiResponse) {}); + * + * //- + * // Get the topics as a readable object stream. + * //- + * pubsub.getTopics() + * .on('error', console.error) + * .on('data', function(topic) { + * // topic is a Topic object. + * }) + * .on('end', function() { + * // All topics retrieved. + * }); */ PubSub.prototype.getTopics = function(query, callback) { var self = this; @@ -410,11 +428,23 @@ PubSub.prototype.topic = function(name, options) { * pubsub.getSubscriptions({ * pageSize: 3 * }, callback); + * + * //- + * // Get the subscriptions as a readable object stream. + * //- + * pubsub.getSubscriptions() + * .on('error', console.error) + * .on('data', function(subscription) { + * // subscription is a Subscription object. + * }) + * .on('end', function() { + * // All subscriptions retrieved. + * }); */ PubSub.prototype.getSubscriptions = function(options, callback) { var self = this; - if (!callback) { + if (util.is(options, 'function')) { callback = options; options = {}; } @@ -496,4 +526,11 @@ PubSub.prototype.makeReq_ = function(method, path, q, body, callback) { this.makeAuthorizedRequest_(reqOpts, callback); }; +/*! Developer Documentation + * + * These methods can be used with either a callback or as a readable object + * stream. `streamRouter` is used to add this dual behavior. + */ +streamRouter.extend(PubSub, ['getSubscriptions', 'getTopics']); + module.exports = PubSub; diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js index c729b355aee..cdb9732d43a 100644 --- a/lib/pubsub/topic.js +++ b/lib/pubsub/topic.js @@ -206,9 +206,21 @@ Topic.prototype.delete = function(callback) { * topic.getSubscriptions({ * pageSize: 3 * }, callback); + * + * //- + * // Get the subscriptions for this topic as a readable object stream. + * //- + * topic.getSubscriptions() + * .on('error', console.error) + * .on('data', function(subscription) { + * // subscription is a Subscription object. + * }) + * .on('end', function() { + * // All subscriptions retrieved. + * }); */ Topic.prototype.getSubscriptions = function(options, callback) { - if (!callback) { + if (util.is(options, 'function')) { callback = options; options = {}; } @@ -216,7 +228,7 @@ Topic.prototype.getSubscriptions = function(options, callback) { options = options || {}; options.topic = this; - this.pubsub.getSubscriptions(options, callback); + return this.pubsub.getSubscriptions(options, callback); }; /** diff --git a/system-test/pubsub.js b/system-test/pubsub.js index 7389723bbb6..51221bcd5cc 100644 --- a/system-test/pubsub.js +++ b/system-test/pubsub.js @@ -77,6 +77,24 @@ describe('pubsub', function() { }); }); + it('should list topics in a stream', function(done) { + var topicsEmitted = []; + + pubsub.getTopics() + .on('error', done) + .on('data', function(topic) { + topicsEmitted.push(topic); + }) + .on('end', function() { + var results = topicsEmitted.filter(function(topic) { + return TOPIC_FULL_NAMES.indexOf(topic.name) !== -1; + }); + + assert.equal(results.length, TOPIC_NAMES.length); + done(); + }); + }); + it('should return a nextQuery if there are more results', function(done) { pubsub.getTopics({ pageSize: TOPIC_NAMES.length - 1 @@ -183,6 +201,20 @@ describe('pubsub', function() { }); }); + it('should list all topic subscriptions as a stream', function(done) { + var subscriptionsEmitted = []; + + topic.getSubscriptions() + .on('error', done) + .on('data', function(subscription) { + subscriptionsEmitted.push(subscription); + }) + .on('end', function() { + assert.equal(subscriptionsEmitted.length, SUBSCRIPTIONS.length); + done(); + }); + }); + it('should list all subscriptions regardless of topic', function(done) { pubsub.getSubscriptions(function(err, subscriptions) { assert.ifError(err); @@ -191,6 +223,20 @@ describe('pubsub', function() { }); }); + it('should list all subscriptions as a stream', function(done) { + var subscriptionEmitted = false; + + pubsub.getSubscriptions() + .on('error', done) + .on('data', function(subscription) { + subscriptionEmitted = subscription instanceof Subscription; + }) + .on('end', function() { + assert.strictEqual(subscriptionEmitted, true); + done(); + }); + }); + it('should allow creation and deletion of a subscription', function(done) { var subName = generateSubName(); topic.subscribe(subName, function(err, sub) { diff --git a/test/pubsub/index.js b/test/pubsub/index.js index 01527e39a78..1f0e94f5fea 100644 --- a/test/pubsub/index.js +++ b/test/pubsub/index.js @@ -20,6 +20,7 @@ var assert = require('assert'); var mockery = require('mockery'); var request = require('request'); var Topic = require('../../lib/pubsub/topic.js'); +var util = require('../../lib/common/util.js'); var SubscriptionCached = require('../../lib/pubsub/subscription.js'); var SubscriptionOverride; @@ -40,12 +41,27 @@ fakeRequest.defaults = function() { return fakeRequest; }; +var extended = false; +var fakeStreamRouter = { + extend: function(Class, methods) { + if (Class.name !== 'PubSub') { + return; + } + + methods = util.arrayize(methods); + assert.equal(Class.name, 'PubSub'); + assert.deepEqual(methods, ['getSubscriptions', 'getTopics']); + extended = true; + } +}; + describe('PubSub', function() { var PubSub; var PROJECT_ID = 'test-project'; var pubsub; before(function() { + mockery.registerMock('../common/stream-router.js', fakeStreamRouter); mockery.registerMock('./subscription.js', Subscription); mockery.registerMock('./topic.js', Topic); mockery.registerMock('request', fakeRequest); @@ -71,6 +87,10 @@ describe('PubSub', function() { }); describe('instantiation', function() { + it('should extend the correct methods', function() { + assert(extended); // See `fakeStreamRouter.extend` + }); + it('should throw if a projectId is not specified', function() { assert.throws(function() { new PubSub(); From 46c993dc76f13003bc101add119b833053eafccb Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Fri, 26 Jun 2015 14:46:58 -0400 Subject: [PATCH 4/9] streamrouter: honor query.limitVal & query.maxResults --- lib/common/stream-router.js | 31 ++++++++++++++++++++++++++----- test/common/stream-router.js | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/lib/common/stream-router.js b/lib/common/stream-router.js index 0885098e99f..45a95ae8d83 100644 --- a/lib/common/stream-router.js +++ b/lib/common/stream-router.js @@ -81,7 +81,10 @@ streamRouter.extend = function(Class, methodNames) { */ streamRouter.router_ = function(args, originalMethod) { args = util.toArray(args); + + var firstArgument = args[0]; var lastArgument = args[args.length - 1]; + var isStreamMode = !util.is(lastArgument, 'function'); if (!isStreamMode) { @@ -103,6 +106,19 @@ streamRouter.router_ = function(args, originalMethod) { _end.apply(this, arguments); }; + var resultsToSend = -1; + if (util.is(firstArgument, 'object')) { + // `firstArgument` is a query object. Check if the user only asked for a + // certain amount of results. + if (util.is(firstArgument.maxResults, 'number')) { + // `maxResults` is used API-wide. + resultsToSend = firstArgument.maxResults; + } else if (util.is(firstArgument.limitVal, 'number')) { + // `limitVal` is part of a Datastore query. + resultsToSend = firstArgument.limitVal; + } + } + function onResultSet(err, results, nextQuery) { if (err) { stream.emit('error', err); @@ -110,16 +126,21 @@ streamRouter.router_ = function(args, originalMethod) { return; } - results.forEach(function(result) { - if (!streamEnded) { - stream.push(result); - } - }); + var result; + while ((result = results.shift()) && resultsToSend !== 0 && !streamEnded) { + stream.push(result); + resultsToSend--; + } if (streamEnded) { return; } + if (resultsToSend === 0) { + stream.end(); + return; + } + if (nextQuery) { originalMethod(nextQuery, onResultSet); } else { diff --git a/test/common/stream-router.js b/test/common/stream-router.js index bcf9b9d8ecd..86ef7f7530d 100644 --- a/test/common/stream-router.js +++ b/test/common/stream-router.js @@ -198,11 +198,44 @@ describe('streamRouter', function() { resultsReceived.push(result); }); rs.on('end', function() { - assert.deepEqual(results, resultsReceived); + assert.deepEqual(resultsReceived, ['a', 'b', 'c']); done(); }); }); + describe('limits', function() { + var limit = 1; + + function originalMethod() { + var callback = [].slice.call(arguments).pop(); + setImmediate(function() { + callback(null, [1, 2, 3]); + }); + } + + it('should respect query.maxResults', function(done) { + var numResultsReceived = 0; + + streamRouter.router_([{ maxResults: limit }], originalMethod) + .on('data', function() { numResultsReceived++; }) + .on('end', function() { + assert.strictEqual(numResultsReceived, limit); + done(); + }); + }); + + it('should respect query.limitVal', function(done) { + var numResultsReceived = 0; + + streamRouter.router_([{ limitVal: limit }], originalMethod) + .on('data', function() { numResultsReceived++; }) + .on('end', function() { + assert.strictEqual(numResultsReceived, limit); + done(); + }); + }); + }); + it('should get more results if nextQuery exists', function(done) { var nextQuery = { a: 'b', c: 'd' }; var nextQuerySent = false; From d09775305d204b0a62e73f5b2f25fc690f8cc6af Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Fri, 26 Jun 2015 14:47:20 -0400 Subject: [PATCH 5/9] streamrouter: implement across datastore --- lib/datastore/request.js | 127 ++++++++++++++++---------------------- system-test/datastore.js | 52 +++++++++------- test/datastore/request.js | 120 ++++++++++------------------------- 3 files changed, 113 insertions(+), 186 deletions(-) diff --git a/lib/datastore/request.js b/lib/datastore/request.js index 56feb4b9979..244436ebe94 100644 --- a/lib/datastore/request.js +++ b/lib/datastore/request.js @@ -20,13 +20,11 @@ 'use strict'; -var streamEvents = require('stream-events'); var request = require('request').defaults({ pool: { maxSockets: Infinity } }); -var through = require('through2'); /** * @type {module:datastore/entity} @@ -40,6 +38,12 @@ var entity = require('./entity.js'); */ var pb = require('./pb.js'); +/** + * @type {module:common/streamrouter} + * @private + */ +var streamRouter = require('../common/stream-router.js'); + /** * @type {module:common/util} * @private @@ -461,9 +465,10 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * supported. * * If you provide a callback, the query is run, and the results are returned as - * the second argument to your callback. A third argument will also exist, which - * is the `endCursor` of the previously-run query. You can use this to extend - * the query you just ran to see if more results exist. + * the second argument to your callback. A third argument may also exist, which + * is a query object that uses the end cursor from the previous query as the + * starting cursor for the next query. You can pass that object back to this + * method to see if more results exist. * * You may also omit the callback to this function to trigger streaming mode. * @@ -472,14 +477,14 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * @param {module:datastore/query} q - Query object. * @param {function=} callback - The callback function. If omitted, a readable * stream instance is returned. - * @param {?Error} callback.err - An error returned while making this request + * @param {?error} callback.err - An error returned while making this request * (may be null). - * @param {Array} callback.entities - The list of entities returned by this + * @param {array} callback.entities - The list of entities returned by this * query. Note that this is a single page of entities, not necessarily * all of the entities. - * @param {String} callback.endCursor - The end cursor of this current query, - * which can be used as the starting cursor of the next query. - * @param {Object} callback.apiResponse - The full API response. + * @param {?module:datastore/query} callback.nextQuery - If present, run another + * query with this object to check for more results. + * @param {object} callback.apiResponse - The full API response. * * @example * //- @@ -488,13 +493,14 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * //- * var query = dataset.createQuery('Lion'); * - * // Retrieve 5 companies. - * transaction.runQuery(query, function(err, entities, endCursor, apiResponse) { - * // Use `endCursor` as the starting cursor for your next query. - * var nextQuery = query.start(endCursor); - * var callback = function(err, entities, endCursor, apiResponse) {}; - * transaction.runQuery(nextQuery, callback); - * }); + * var callback = function(err, entities, nextQuery, apiResponse) { + * if (nextQuery) { + * // More results might exist. + * transaction.runQuery(nextQuery, callback); + * } + * }; + * + * transaction.runQuery(query, callback); * * //- * // If you omit the callback, runQuery will automatically call subsequent @@ -502,7 +508,11 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * // found. * //- * transaction.runQuery(query) - * .on('data', function (entity) {}); + * .on('error', console.error) + * .on('data', function (entity) {}) + * .on('end', function() { + * // All entities retrieved. + * }); * * //- * // A keys-only query returns just the keys of the result entities instead of @@ -515,74 +525,34 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * // entities[].data = Empty object * }); */ -DatastoreRequest.prototype.runQuery = function(q, callback) { - var that = this; - var stream; - var resultsToSend = q.limitVal; - +DatastoreRequest.prototype.runQuery = function(query, callback) { var req = { read_options: {}, - query: entity.queryToQueryProto(q) + query: entity.queryToQueryProto(query) }; - if (q.namespace) { + if (query.namespace) { req.partition_id = { - namespace: q.namespace + namespace: query.namespace }; } - if (!util.is(callback, 'function')) { - stream = streamEvents(through.obj()); - stream.once('reading', runQuery); - return stream; - } else { - runQuery(); - } - - function runQuery() { - that.makeReq_('runQuery', req, function(err, resp) { - if (err) { - if (stream) { - stream.emit('error', err, resp); - stream.end(); - } else { - callback(err, null, null, resp); - } - return; - } - - var entities = entity.formatArray(resp.batch.entity_result); - - var cursor = ''; - if (resp.batch.end_cursor) { - cursor = resp.batch.end_cursor.toBase64(); - } - - if (!stream) { - callback(null, entities, cursor, resp); - return; - } - - if (!cursor || entities.length === 0) { - stream.end(); - return; - } + this.makeReq_('runQuery', req, function(err, resp) { + if (err) { + callback(err, null, null, resp); + return; + } - var result; - while ((result = entities.shift()) && resultsToSend !== 0) { - stream.push(result); - resultsToSend--; - } + var entities = entity.formatArray(resp.batch.entity_result); + var nextQuery = null; - if (resultsToSend === 0) { - stream.end(); - return; - } + if (resp.batch.end_cursor && entities.length > 0) { + var endCursor = resp.batch.end_cursor.toBase64(); + nextQuery = query.start(endCursor).offset(0); + } - req.query = entity.queryToQueryProto(q.start(cursor).offset(0)); - runQuery(); - }); - } + callback(null, entities, nextQuery, resp); + }); }; /** @@ -760,4 +730,11 @@ DatastoreRequest.prototype.makeReq_ = function(method, body, callback) { }); }; +/*! Developer Documentation + * + * This method can be used with either a callback or as a readable object + * stream. `streamRouter` is used to add this dual behavior. + */ +streamRouter.extend(DatastoreRequest, 'runQuery'); + module.exports = DatastoreRequest; diff --git a/system-test/datastore.js b/system-test/datastore.js index e0636ec80ed..48853bb3fbd 100644 --- a/system-test/datastore.js +++ b/system-test/datastore.js @@ -280,17 +280,17 @@ describe('datastore', function() { }); it('should limit queries', function(done) { - var q = ds.createQuery('Character').hasAncestor(ancestor) - .limit(5); - ds.runQuery(q, function(err, firstEntities, firstEndCursor) { + var q = ds.createQuery('Character').hasAncestor(ancestor).limit(5); + + ds.runQuery(q, function(err, firstEntities, secondQuery) { assert.ifError(err); assert.equal(firstEntities.length, 5); - var secondQ = q.start(firstEndCursor); - ds.runQuery(secondQ, function(err, secondEntities, secondEndCursor) { + + ds.runQuery(secondQuery, function(err, secondEntities, thirdQuery) { assert.ifError(err); assert.equal(secondEntities.length, 3); - var thirdQ = q.start(secondEndCursor); - ds.runQuery(thirdQ, function(err, thirdEntities) { + + ds.runQuery(thirdQuery, function(err, thirdEntities) { assert.ifError(err); assert.equal(thirdEntities.length, 0); done(); @@ -398,17 +398,22 @@ describe('datastore', function() { }); it('should paginate with offset and limit', function(done) { - var q = ds.createQuery('Character').hasAncestor(ancestor) - .offset(2) - .limit(3) - .order('appearances'); - ds.runQuery(q, function(err, entities, endCursor) { + var q = ds.createQuery('Character') + .hasAncestor(ancestor) + .offset(2) + .limit(3) + .order('appearances'); + + ds.runQuery(q, function(err, entities, secondQuery) { assert.ifError(err); + assert.equal(entities.length, 3); assert.equal(entities[0].data.name, 'Robb'); assert.equal(entities[2].data.name, 'Catelyn'); - var secondQuery = q.start(endCursor).offset(0); - ds.runQuery(secondQuery, function(err, secondEntities) { + + ds.runQuery(secondQuery.offset(0), function(err, secondEntities) { + assert.ifError(err); + assert.equal(secondEntities.length, 3); assert.equal(secondEntities[0].data.name, 'Sansa'); assert.equal(secondEntities[2].data.name, 'Arya'); @@ -418,17 +423,16 @@ describe('datastore', function() { }); it('should resume from a start cursor', function(done) { - var q = ds.createQuery('Character').hasAncestor(ancestor) - .offset(2) - .limit(2) - .order('appearances'); - ds.runQuery(q, function(err, entities, endCursor) { + var q = ds.createQuery('Character') + .hasAncestor(ancestor) + .offset(2) + .limit(2) + .order('appearances'); + + ds.runQuery(q, function(err, entities, nextQuery) { assert.ifError(err); - var cursorQuery = - ds.createQuery('Character').hasAncestor(ancestor) - .order('appearances') - .start(endCursor); - ds.runQuery(cursorQuery, function(err, secondEntities) { + + ds.runQuery(nextQuery.limit(-1), function(err, secondEntities) { assert.ifError(err); assert.equal(secondEntities.length, 4); assert.equal(secondEntities[0].data.name, 'Catelyn'); diff --git a/test/datastore/request.js b/test/datastore/request.js index 9cbb54298d3..6e384da3ed1 100644 --- a/test/datastore/request.js +++ b/test/datastore/request.js @@ -56,6 +56,20 @@ pb.FakeMethodResponse = { } }; +var extended = false; +var fakeStreamRouter = { + extend: function(Class, methods) { + if (Class.name !== 'DatastoreRequest') { + return; + } + + methods = util.arrayize(methods); + assert.equal(Class.name, 'DatastoreRequest'); + assert.deepEqual(methods, ['runQuery']); + extended = true; + } +}; + describe('Request', function() { var Request; var key; @@ -64,6 +78,7 @@ describe('Request', function() { before(function() { mockery.registerMock('./pb.js', pb); + mockery.registerMock('../common/stream-router.js', fakeStreamRouter); mockery.registerMock('request', fakeRequest); mockery.enable({ useCleanCache: true, @@ -90,8 +105,18 @@ describe('Request', function() { }; }); - it('should have set correct defaults on Request', function() { - assert.deepEqual(REQUEST_DEFAULT_CONF, { pool: { maxSockets: Infinity } }); + describe('instantiation', function() { + it('should extend the correct methods', function() { + assert(extended); // See `fakeStreamRouter.extend` + }); + + it('should have set correct defaults on Request', function() { + assert.deepEqual(REQUEST_DEFAULT_CONF, { + pool: { + maxSockets: Infinity + } + }); + }); }); describe('get', function() { @@ -469,110 +494,31 @@ describe('Request', function() { }); }); - it('should return an empty string if no end cursor exists', function(done) { + it('should return null nextQuery if no end cursor exists', function(done) { request.makeReq_ = function(method, req, callback) { callback(null, mockResponse.withResults); }; - request.runQuery(query, function(err, entities, endCursor) { + request.runQuery(query, function(err, entities, nextQuery) { assert.ifError(err); - assert.strictEqual(endCursor, ''); + assert.strictEqual(nextQuery, null); done(); }); }); - it('should return the end cursor from the last query', function(done) { + it('should return a nextQuery', function(done) { var response = mockResponse.withResultsAndEndCursor; request.makeReq_ = function(method, req, callback) { callback(null, response); }; - request.runQuery(query, function(err, entities, endCursor) { + request.runQuery(query, function(err, entities, nextQuery) { assert.ifError(err); - assert.equal(endCursor, response.batch.end_cursor.toBase64()); + assert.equal(nextQuery.startVal, response.batch.end_cursor.toBase64()); done(); }); }); - - describe('streams', function() { - it('should be a stream if a callback is omitted', function() { - assert(request.runQuery(query) instanceof stream.Stream); - }); - - it('should run the query after being read from', function(done) { - request.makeReq_ = function() { - done(); - }; - - request.runQuery(query).emit('reading'); - }); - - it('should continuosly run until there are no results', function(done) { - var run = 0; - var timesToRun = 2; - - request.makeReq_ = function(method, req, callback) { - run++; - - if (run < timesToRun) { - callback(null, mockResponse.withResultsAndEndCursor); - } else { - var lastEndCursor = - mockResponse.withResultsAndEndCursor.batch.end_cursor.toBase64(); - lastEndCursor = new Buffer(lastEndCursor, 'base64').toString(); - - assert.equal(String(req.query.start_cursor), lastEndCursor); - assert.strictEqual(req.query.offset, undefined); - - callback(null, mockResponse.withResults); - } - }; - - var resultsReturned = 0; - - request.runQuery(query) - .on('data', function() { resultsReturned++; }) - .on('end', function() { - assert.equal(resultsReturned, mockRespGet.found.length); - done(); - }); - }); - - it('should only emit the limited number of results', function(done) { - var limit = 2; - - query.limitVal = limit; - - request.makeReq_ = function(method, req, callback) { - callback(null, mockResponse.withResultsAndEndCursor); - }; - - var resultsReturned = 0; - - request.runQuery(query) - .on('data', function() { resultsReturned++; }) - .on('end', function() { - assert.equal(resultsReturned, limit); - done(); - }); - }); - - it('should emit an error', function(done) { - var error = new Error('Error.'); - - request.makeReq_ = function(method, req, callback) { - callback(error); - }; - - request.runQuery(query) - .on('error', function(err) { - assert.equal(err, error); - done(); - }) - .emit('reading'); - }); - }); }); describe('update', function() { From 49e0f0fa47737145450dc6adc2293d4716e2e47b Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Fri, 26 Jun 2015 16:50:31 -0400 Subject: [PATCH 6/9] streamrouter: implement across storage --- lib/storage/bucket.js | 25 ++++++++++++++++++++++ lib/storage/index.js | 25 ++++++++++++++++++++++ system-test/storage.js | 32 +++++++++++++++++++++++++++- test/storage/bucket.js | 21 ++++++++++++++++++- test/storage/index.js | 47 ++++++++++++++++++++++++++++++++++++++---- 5 files changed, 144 insertions(+), 6 deletions(-) diff --git a/lib/storage/bucket.js b/lib/storage/bucket.js index a4593776593..ad6d47f16b0 100644 --- a/lib/storage/bucket.js +++ b/lib/storage/bucket.js @@ -38,6 +38,12 @@ var Acl = require('./acl.js'); */ var File = require('./file.js'); +/** + * @type {module:common/streamrouter} + * @private + */ +var streamRouter = require('../common/stream-router.js'); + /** * @type {module:common/util} * @private @@ -496,6 +502,18 @@ Bucket.prototype.file = function(name, options) { * }, function(err, files, nextQuery, apiResponse) { * // Each file is scoped to its generation. * }); + * + * //- + * // Get the files from your bucket as a readable object stream. + * //- + * bucket.getFiles() + * .on('error', console.error) + * .on('data', function(file) { + * // file is a File object. + * }) + * .on('end', function() { + * // All files retrieved. + * }); */ Bucket.prototype.getFiles = function(query, callback) { var self = this; @@ -1051,4 +1069,11 @@ Bucket.prototype.makeReq_ = function(method, path, query, body, callback) { this.storage.makeAuthorizedRequest_(reqOpts, callback); }; +/*! Developer Documentation + * + * This method can be used with either a callback or as a readable object + * stream. `streamRouter` is used to add this dual behavior. + */ +streamRouter.extend(Bucket, 'getFiles'); + module.exports = Bucket; diff --git a/lib/storage/index.js b/lib/storage/index.js index e41a9ee8654..ebd2812d3a9 100644 --- a/lib/storage/index.js +++ b/lib/storage/index.js @@ -28,6 +28,12 @@ var extend = require('extend'); */ var Bucket = require('./bucket.js'); +/** + * @type {module:common/streamrouter} + * @private + */ +var streamRouter = require('../common/stream-router.js'); + /** * @type {module:common/util} * @private @@ -272,6 +278,18 @@ Storage.prototype.createBucket = function(name, metadata, callback) { * gcs.getBuckets({ * maxResults: 5 * }, function(err, buckets, nextQuery, apiResponse) {}); + * + * //- + * // Get the buckets from your project as a readable object stream. + * //- + * gcs.getBuckets() + * .on('error', console.error) + * .on('data', function(bucket) { + * // bucket is a Bucket object. + * }) + * .on('end', function() { + * // All buckets retrieved. + * }); */ Storage.prototype.getBuckets = function(query, callback) { var that = this; @@ -324,4 +342,11 @@ Storage.prototype.makeReq_ = function(method, path, query, body, callback) { this.makeAuthorizedRequest_(reqOpts, callback); }; +/*! Developer Documentation + * + * This method can be used with either a callback or as a readable object + * stream. `streamRouter` is used to add this dual behavior. + */ +streamRouter.extend(Storage, 'getBuckets'); + module.exports = Storage; diff --git a/system-test/storage.js b/system-test/storage.js index 4a14c6da833..67aab6d66d6 100644 --- a/system-test/storage.js +++ b/system-test/storage.js @@ -18,12 +18,14 @@ var assert = require('assert'); var async = require('async'); +var Bucket = require('../lib/storage/bucket.js'); var crypto = require('crypto'); +var File = require('../lib/storage/file.js'); var fs = require('fs'); var request = require('request'); var through = require('through2'); var tmp = require('tmp'); -var util = require('../lib/common/util'); +var util = require('../lib/common/util.js'); var uuid = require('node-uuid'); var prop = util.prop; @@ -419,6 +421,20 @@ describe('storage', function() { done(); } }); + + it('should get buckets as a stream', function(done) { + var bucketEmitted = false; + + storage.getBuckets() + .on('error', done) + .on('data', function(bucket) { + bucketEmitted = bucket instanceof Bucket; + }) + .on('end', function() { + assert.strictEqual(bucketEmitted, true); + done(); + }); + }); }); describe('bucket metadata', function() { @@ -699,6 +715,20 @@ describe('storage', function() { }); }); + it('should get files as a stream', function(done) { + var fileEmitted = false; + + bucket.getFiles() + .on('error', done) + .on('data', function(file) { + fileEmitted = file instanceof File; + }) + .on('end', function() { + assert.strictEqual(fileEmitted, true); + done(); + }); + }); + it('should paginate the list', function(done) { var query = { maxResults: filenames.length - 1 }; bucket.getFiles(query, function(err, files, nextQuery) { diff --git a/test/storage/bucket.js b/test/storage/bucket.js index 1f12efeca2f..0fd9b341788 100644 --- a/test/storage/bucket.js +++ b/test/storage/bucket.js @@ -63,6 +63,20 @@ fakeAsync.eachLimit = function() { (eachLimit_Override || async.eachLimit).apply(null, arguments); }; +var extended = false; +var fakeStreamRouter = { + extend: function(Class, methods) { + if (Class.name !== 'Bucket') { + return; + } + + methods = util.arrayize(methods); + assert.equal(Class.name, 'Bucket'); + assert.deepEqual(methods, ['getFiles']); + extended = true; + } +}; + describe('Bucket', function() { var Bucket; var BUCKET_NAME = 'test-bucket'; @@ -75,6 +89,7 @@ describe('Bucket', function() { before(function() { mockery.registerMock('./file.js', FakeFile); + mockery.registerMock('../common/stream-router.js', fakeStreamRouter); mockery.registerMock('async', fakeAsync); mockery.registerMock('request', fakeRequest); mockery.enable({ @@ -95,7 +110,11 @@ describe('Bucket', function() { bucket = new Bucket(options, BUCKET_NAME); }); - describe('initialization', function() { + describe('instantiation', function() { + it('should extend the correct methods', function() { + assert(extended); // See `fakeStreamRouter.extend` + }); + it('should re-use provided connection', function() { assert.deepEqual(bucket.authorizeReq_, options.authorizeReq_); }); diff --git a/test/storage/index.js b/test/storage/index.js index 1f632e1018f..9bd6559949a 100644 --- a/test/storage/index.js +++ b/test/storage/index.js @@ -14,24 +14,63 @@ * limitations under the License. */ -/*global describe, it, beforeEach */ - 'use strict'; +// If we don't stub see4_crc32 and use mockery, we get "Module did not self- +// register". +var crc = require('sse4_crc32'); + var assert = require('assert'); -var Bucket = require('../../lib/storage/bucket.js'); var extend = require('extend'); -var Storage = require('../../lib/storage'); +var mockery = require('mockery'); + var util = require('../../lib/common/util.js'); +var extended = false; +var fakeStreamRouter = { + extend: function(Class, methods) { + if (Class.name !== 'Storage') { + return; + } + + methods = util.arrayize(methods); + assert.equal(Class.name, 'Storage'); + assert.deepEqual(methods, ['getBuckets']); + extended = true; + } +}; + describe('Storage', function() { + var Storage; var storage; + var Bucket; + + before(function() { + mockery.registerMock('sse4_crc32', crc); + mockery.registerMock('../common/stream-router.js', fakeStreamRouter); + mockery.enable({ + useCleanCache: true, + warnOnUnregistered: false + }); + + Bucket = require('../../lib/storage/bucket.js'); + Storage = require('../../lib/storage'); + }); + + after(function() { + mockery.deregisterAll(); + mockery.disable(); + }); beforeEach(function() { storage = new Storage({ projectId: 'project-id' }); }); describe('instantiation', function() { + it('should extend the correct methods', function() { + assert(extended); // See `fakeStreamRouter.extend` + }); + it('should throw if a projectId is not specified', function() { assert.throws(function() { new Storage(); From af8299a2cbf0529d378b14a799ceb61ae37c25ae Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Tue, 30 Jun 2015 12:28:31 -0400 Subject: [PATCH 7/9] streamrouter: document methods to instruct user on how to exit early --- lib/bigquery/index.js | 40 +++++++++++++++++++++++++++++++-------- lib/bigquery/table.js | 19 +++++++++++++++---- lib/pubsub/index.js | 18 ++++++++++++++++++ lib/pubsub/topic.js | 9 +++++++++ lib/search/index-class.js | 9 +++++++++ lib/search/index.js | 9 +++++++++ lib/storage/bucket.js | 9 +++++++++ lib/storage/index.js | 9 +++++++++ 8 files changed, 110 insertions(+), 12 deletions(-) diff --git a/lib/bigquery/index.js b/lib/bigquery/index.js index 9df378425de..306f6b44aba 100644 --- a/lib/bigquery/index.js +++ b/lib/bigquery/index.js @@ -168,9 +168,14 @@ BigQuery.prototype.dataset = function(id) { * @param {function} callback - The callback function. * * @example - * bigquery.getDatasets(function(err, datasets, nextQuery, apiResponse) { - * // If `nextQuery` is non-null, there are more results to fetch. - * }); + * var callback = function(err, datasets, nextQuery, apiResponse) { + * if (nextQuery) { + * // More results exist. + * bigquery.getDatasets(nextQuery, callback); + * } + * }; + * + * bigquery.getDatasets(callback); * * //- * // Get the datasets from your project as a readable object stream. @@ -183,6 +188,15 @@ BigQuery.prototype.dataset = function(id) { * .on('end', function() { * // All datasets retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * bigquery.getDatasets() + * .on('data', function(dataset) { + * this.end(); + * }); */ BigQuery.prototype.getDatasets = function(query, callback) { var that = this; @@ -250,6 +264,15 @@ BigQuery.prototype.getDatasets = function(query, callback) { * .on('end', function() { * // All jobs retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * bigquery.getJobs() + * .on('data', function(job) { + * this.end(); + * }); */ BigQuery.prototype.getJobs = function(options, callback) { var that = this; @@ -330,14 +353,15 @@ BigQuery.prototype.job = function(id) { * //- * // You can run a query against your data in a serial manner. * //- - * bigquery.query(query, function(err, rows, nextQuery, apiResponse) { + * var callback = function(err, rows, nextQuery, apiResponse) { * // Handle results here. + * * if (nextQuery) { - * bigquery.query(nextQuery, function(err, rows, nextQuery, apiResponse) { - * // Handle more results here. - * }); + * bigquery.query(nextQuery, callback); * } - * }); + * }; + * + * bigquery.query(query, callback); * * //- * // You can also use the `query` method as a readable object stream by diff --git a/lib/bigquery/table.js b/lib/bigquery/table.js index 814cc0bdacb..627036bb395 100644 --- a/lib/bigquery/table.js +++ b/lib/bigquery/table.js @@ -491,12 +491,14 @@ Table.prototype.getMetadata = function(callback) { * maxResults: 100 * }; * - * table.getRows(options, function(err, rows, nextQuery, apiResponse) { - * // If `nextQuery` is non-null, there are more results to fetch. + * var callback = function(err, rows, nextQuery, apiResponse) { * if (nextQuery) { - * table.getRows(nextQuery, function(err, rows, nextQuery, apiResponse) {}); + * // More results exist. + * table.getRows(nextQuery, callback); * } - * }); + * }; + * + * table.getRows(options, callback); * * //- * // Get the rows as a readable object stream. @@ -507,6 +509,15 @@ Table.prototype.getMetadata = function(callback) { * .on('end', function() { * // All rows have been retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * table.getRows() + * .on('data', function(row) { + * this.end(); + * }); */ Table.prototype.getRows = function(options, callback) { var that = this; diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 5c5ac1026f5..fcdd1f0a781 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -139,6 +139,15 @@ function PubSub(options) { * .on('end', function() { * // All topics retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * pubsub.getTopics() + * .on('data', function(topic) { + * this.end(); + * }); */ PubSub.prototype.getTopics = function(query, callback) { var self = this; @@ -440,6 +449,15 @@ PubSub.prototype.topic = function(name, options) { * .on('end', function() { * // All subscriptions retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * pubsub.getSubscriptions() + * .on('data', function(topic) { + * this.end(); + * }); */ PubSub.prototype.getSubscriptions = function(options, callback) { var self = this; diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js index cdb9732d43a..602ce68f810 100644 --- a/lib/pubsub/topic.js +++ b/lib/pubsub/topic.js @@ -218,6 +218,15 @@ Topic.prototype.delete = function(callback) { * .on('end', function() { * // All subscriptions retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * topic.getSubscriptions() + * .on('data', function(subscription) { + * this.end(); + * }); */ Topic.prototype.getSubscriptions = function(options, callback) { if (util.is(options, 'function')) { diff --git a/lib/search/index-class.js b/lib/search/index-class.js index 3226e075489..90cd318df2f 100644 --- a/lib/search/index-class.js +++ b/lib/search/index-class.js @@ -190,6 +190,15 @@ Index.prototype.document = function(id) { * .on('end', function() { * // All documents retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * index.getDocuments() + * .on('data', function(document) { + * this.end(); + * }); */ Index.prototype.getDocuments = function(query, callback) { var self = this; diff --git a/lib/search/index.js b/lib/search/index.js index 15e16f3ab71..0e59745b256 100644 --- a/lib/search/index.js +++ b/lib/search/index.js @@ -138,6 +138,15 @@ function Search(options) { * .on('end', function() { * // All indexes retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * search.getIndexes() + * .on('data', function(index) { + * this.end(); + * }); */ Search.prototype.getIndexes = function(query, callback) { var self = this; diff --git a/lib/storage/bucket.js b/lib/storage/bucket.js index ad6d47f16b0..69ecf1d69e6 100644 --- a/lib/storage/bucket.js +++ b/lib/storage/bucket.js @@ -514,6 +514,15 @@ Bucket.prototype.file = function(name, options) { * .on('end', function() { * // All files retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * bucket.getFiles() + * .on('data', function(file) { + * this.end(); + * }); */ Bucket.prototype.getFiles = function(query, callback) { var self = this; diff --git a/lib/storage/index.js b/lib/storage/index.js index ebd2812d3a9..2100e231121 100644 --- a/lib/storage/index.js +++ b/lib/storage/index.js @@ -290,6 +290,15 @@ Storage.prototype.createBucket = function(name, metadata, callback) { * .on('end', function() { * // All buckets retrieved. * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * gcs.getBuckets() + * .on('data', function(bucket) { + * this.end(); + * }); */ Storage.prototype.getBuckets = function(query, callback) { var that = this; From 88fd2fabab85c5ea7072fd0e85d84a7cdfe163a9 Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Sat, 11 Jul 2015 14:30:19 -0400 Subject: [PATCH 8/9] streamrouter: support `autoPaginate: true` --- lib/bigquery/index.js | 40 ++++- lib/bigquery/job.js | 27 ++- lib/bigquery/table.js | 12 ++ lib/common/stream-router.js | 119 +++++++++----- lib/datastore/query.js | 10 ++ lib/datastore/request.js | 11 ++ lib/pubsub/index.js | 30 +++- lib/search/index-class.js | 27 +++ lib/search/index.js | 12 ++ lib/storage/bucket.js | 12 ++ lib/storage/index.js | 12 ++ package.json | 3 +- system-test/bigquery.js | 54 ++++++ system-test/datastore.js | 27 +++ system-test/search.js | 28 ++++ system-test/storage.js | 22 ++- test/common/stream-router.js | 307 ++++++++++++++++++++++++++++------- test/datastore/query.js | 11 ++ 18 files changed, 659 insertions(+), 105 deletions(-) diff --git a/lib/bigquery/index.js b/lib/bigquery/index.js index 306f6b44aba..462552d91f6 100644 --- a/lib/bigquery/index.js +++ b/lib/bigquery/index.js @@ -162,6 +162,8 @@ BigQuery.prototype.dataset = function(id) { * * @param {object=} query - Configuration object. * @param {boolean} query.all - List all datasets, including hidden ones. + * @param {boolean} query.autoPaginate - Have pagination handled automatically. + * Default: false. * @param {number} query.maxResults - Maximum number of results to return. * @param {string} query.pageToken - Token returned from a previous call, to * request the next page of results. @@ -178,6 +180,16 @@ BigQuery.prototype.dataset = function(id) { * bigquery.getDatasets(callback); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * bigquery.getDatasets({ + * autoPaginate: true + * }, function(err, datasets) { + * // Called after all datasets have been retrieved. + * }); + * + * //- * // Get the datasets from your project as a readable object stream. * //- * bigquery.getDatasets() @@ -238,6 +250,8 @@ BigQuery.prototype.getDatasets = function(query, callback) { * @param {object=} options - Configuration object. * @param {boolean=} options.allUsers - Display jobs owned by all users in the * project. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: false. * @param {number=} options.maxResults - Maximum number of results to return. * @param {string=} options.pageToken - Token returned from a previous call, to * request the next page of results. @@ -254,6 +268,15 @@ BigQuery.prototype.getDatasets = function(query, callback) { * }); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * bigquery.getJobs({ + * autoPaginate: true + * }, function(err, jobs) { + * // Called after all jobs have been retrieved. + * }); + * //- * // Get the jobs from your project as a readable object stream. * //- * bigquery.getJobs() @@ -336,6 +359,8 @@ BigQuery.prototype.job = function(id) { * queries for you, pushing each row to the stream. * * @param {string|object} options - A string SQL query or configuration object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: false. * @param {number} options.maxResults - Maximum number of results to read. * @param {string} options.query - A query string, following the BigQuery query * syntax, of the query to execute. @@ -343,9 +368,7 @@ BigQuery.prototype.job = function(id) { * complete, in milliseconds, before returning. Default is to return * immediately. If the timeout passes before the job completes, the request * will fail with a `TIMEOUT` error. - * @param {function=} callback - The callback function. If you intend to - * continuously run this query until all results are in as part of a stream, - * do not pass a callback. + * @param {function=} callback - The callback function. * * @example * var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100'; @@ -364,6 +387,17 @@ BigQuery.prototype.job = function(id) { * bigquery.query(query, callback); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * bigquery.query({ + * query: query, + * autoPaginate: true + * }, function(err, rows) { + * // Called after all rows have been retrieved. + * }); + * + * //- * // You can also use the `query` method as a readable object stream by * // omitting the callback. * //- diff --git a/lib/bigquery/job.js b/lib/bigquery/job.js index 87c1e481e10..2d27c8964bd 100644 --- a/lib/bigquery/job.js +++ b/lib/bigquery/job.js @@ -91,6 +91,8 @@ Job.prototype.getMetadata = function(callback) { * Get the results of a job. * * @param {object=} options - Configuration object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: false. * @param {number} options.maxResults - Maximum number of results to read. * @param {string} options.pageToken - Page token, returned by a previous call, * to request the next page of results. Note: This is automatically added to @@ -105,19 +107,34 @@ Job.prototype.getMetadata = function(callback) { * do not pass a callback. * * @example + * var callback = function(err, rows, nextQuery, apiResponse) { + * if (nextQuery) { + * // More results exist. + * job.getQueryResults(nextQuery, callback); + * } + * }; + * * //- * // Use the default options to get the results of a query. * //- - * job.getQueryResults(function(err, rows, nextQuery, apiResponse) {}); + * job.getQueryResults(callback); * * //- * // Customize the results you want to fetch. * //- - * var options = { + * job.getQueryResults({ * maxResults: 100 - * }; + * }, callback); * - * job.getQueryResults(options, function(err, rows, nextQuery, apiResponse) {}); + * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * job.getQueryResults({ + * autoPaginate: true + * }, function(err, rows) { + * // Called after all rows have been retrieved. + * }); * * //- * // Consume the results from the query as a readable object stream. @@ -125,7 +142,7 @@ Job.prototype.getMetadata = function(callback) { * var through2 = require('through2'); * var fs = require('fs'); * - * job.getQueryResults(options) + * job.getQueryResults() * .pipe(through2.obj(function (row, enc, next) { * this.push(JSON.stringify(row) + '\n'); * })) diff --git a/lib/bigquery/table.js b/lib/bigquery/table.js index 627036bb395..94a53923391 100644 --- a/lib/bigquery/table.js +++ b/lib/bigquery/table.js @@ -483,6 +483,8 @@ Table.prototype.getMetadata = function(callback) { * your callback as an array of objects matching your table's schema. * * @param {object=} options - The configuration object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: false. * @param {number} options.maxResults - Maximum number of results to return. * @param {function} callback - The callback function. * @@ -501,6 +503,16 @@ Table.prototype.getMetadata = function(callback) { * table.getRows(options, callback); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * table.getRows({ + * autoPaginate: true + * }, function(err, rows) { + * // Called after all rows have been retrieved. + * }); + * + * //- * // Get the rows as a readable object stream. * //- * table.getRows(options) diff --git a/lib/common/stream-router.js b/lib/common/stream-router.js index 45a95ae8d83..51dde04b9f3 100644 --- a/lib/common/stream-router.js +++ b/lib/common/stream-router.js @@ -20,6 +20,7 @@ 'use strict'; +var concat = require('concat-stream'); var streamEvents = require('stream-events'); var through = require('through2'); @@ -60,36 +61,100 @@ streamRouter.extend = function(Class, methodNames) { var originalMethod = Class.prototype[methodName]; Class.prototype[methodName] = function() { - return streamRouter.router_(arguments, originalMethod.bind(this)); + var parsedArguments = streamRouter.parseArguments_(arguments); + return streamRouter.router_(parsedArguments, originalMethod.bind(this)); }; }); }; /** - * The router accepts all incoming arguments to the overwritten method. If the - * last argument is a function, simply pass them through to the original method. - * If the last argument is not a function, activate stream mode. + * Parse a pseudo-array `arguments` for a query and callback. * - * Stream mode simply calls the nextQuery recursively. The stream ends when - * `nextQuery` is null. + * @param {array} args - The original `arguments` pseduo-array that the original + * method received. + */ +streamRouter.parseArguments_ = function(args) { + var parsedArguments = {}; + + var firstArgument = args[0]; + var lastArgument = args[args.length - 1]; + + if (util.is(firstArgument, 'function')) { + parsedArguments.callback = firstArgument; + } else { + parsedArguments.query = firstArgument; + } + + if (util.is(lastArgument, 'function')) { + parsedArguments.callback = lastArgument; + } + + return parsedArguments; +}; + +/** + * The router accepts a query and callback that were passed to the overwritten + * method. If there's a callback, simply pass the query and/or callback through + * to the original method. If there isn't a callback. stream mode is activated. * - * @param {array} args - The original `arguments` pseudo-array as it was - * received by the original method. + * @param {array} parsedArguments - Parsed arguments from the original method + * call. + * @param {object=|string=} parsedArguments.query - Query object. This is most + * commonly an object, but to make the API more simple, it can also be a + * string in some places. + * @param {function=} parsedArguments.callback - Callback function. * @param {function} originalMethod - The cached method that accepts a callback * and returns `nextQuery` to receive more results. * @return {undefined|stream} */ -streamRouter.router_ = function(args, originalMethod) { - args = util.toArray(args); +streamRouter.router_ = function(parsedArguments, originalMethod) { + var query = parsedArguments.query || {}; + var callback = parsedArguments.callback; + + if (callback) { + if (query.autoPaginate === true || query.autoPaginateVal === true) { + delete query.autoPaginate; + delete query.autoPaginateVal; + + this.runAsStream_(query, originalMethod) + .on('error', callback) + .pipe(concat(function(results) { + callback(null, results); + })); + } else { + originalMethod(query, callback); + } + } else { + return this.runAsStream_(query, originalMethod); + } +}; - var firstArgument = args[0]; - var lastArgument = args[args.length - 1]; +/** + * This method simply calls the nextQuery recursively, emitting results to a + * stream. The stream ends when `nextQuery` is null. + * + * `maxResults` and `limitVal` (from Datastore) will act as a cap for how many + * results are fetched and emitted to the stream. + * + * @param {object=|string=} query - Query object. This is most + * commonly an object, but to make the API more simple, it can also be a + * string in some places. + * @param {function} originalMethod - The cached method that accepts a callback + * and returns `nextQuery` to receive more results. + * @return {stream} - Readable object stream. + */ +streamRouter.runAsStream_ = function(query, originalMethod) { + query = query || {}; - var isStreamMode = !util.is(lastArgument, 'function'); + var resultsToSend = -1; - if (!isStreamMode) { - originalMethod.apply(null, args); - return; + // Check if the user only asked for a certain amount of results. + if (util.is(query.maxResults, 'number')) { + // `maxResults` is used API-wide. + resultsToSend = query.maxResults; + } else if (util.is(query.limitVal, 'number')) { + // `limitVal` is part of a Datastore query. + resultsToSend = query.limitVal; } var stream = streamEvents(through.obj()); @@ -106,19 +171,6 @@ streamRouter.router_ = function(args, originalMethod) { _end.apply(this, arguments); }; - var resultsToSend = -1; - if (util.is(firstArgument, 'object')) { - // `firstArgument` is a query object. Check if the user only asked for a - // certain amount of results. - if (util.is(firstArgument.maxResults, 'number')) { - // `maxResults` is used API-wide. - resultsToSend = firstArgument.maxResults; - } else if (util.is(firstArgument.limitVal, 'number')) { - // `limitVal` is part of a Datastore query. - resultsToSend = firstArgument.limitVal; - } - } - function onResultSet(err, results, nextQuery) { if (err) { stream.emit('error', err); @@ -149,14 +201,7 @@ streamRouter.router_ = function(args, originalMethod) { } stream.once('reading', function() { - if (util.is(lastArgument, 'undefined')) { - // Replace it with onResultSet. - args.splice(args.length - 1, 1, onResultSet); - } else { - args = args.concat(onResultSet); - } - - originalMethod.apply(null, args); + originalMethod.call(null, query, onResultSet); }); return stream; diff --git a/lib/datastore/query.js b/lib/datastore/query.js index f3edda66d19..8c502c2dee3 100644 --- a/lib/datastore/query.js +++ b/lib/datastore/query.js @@ -73,12 +73,22 @@ function Query(namespace, kinds) { this.selectVal = []; // pagination + this.autoPaginateVal = false; this.startVal = null; this.endVal = null; this.limitVal = -1; this.offsetVal = -1; } +/** + * @return {module:datastore/query} + */ +Query.prototype.autoPaginate = function(autoPaginateVal) { + var query = extend(new Query(), this); + query.autoPaginateVal = autoPaginateVal !== false; + return query; +}; + /** * Datastore allows querying on properties. Supported comparison operators * are `=`, `<`, `>`, `<=`, and `>=`. "Not equal" and `IN` operators are diff --git a/lib/datastore/request.js b/lib/datastore/request.js index 244436ebe94..929568973a7 100644 --- a/lib/datastore/request.js +++ b/lib/datastore/request.js @@ -503,6 +503,17 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * transaction.runQuery(query, callback); * * //- + * // To have pagination handled for you, call `autoPaginate()`. Note the + * // changed callback parameters. + * //- + * + * var queryWithAutoPagination = dataset.createQuery('Lion').autoPaginate(); + * + * transaction.runQuery(queryWithAutoPagination, function(err, entities) { + * // Called after all entities have been retrieved. + * }); + * + * //- * // If you omit the callback, runQuery will automatically call subsequent * // queries until no results remain. Entity objects will be pushed as they are * // found. diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index fcdd1f0a781..75b4e526fb0 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -106,6 +106,8 @@ function PubSub(options) { * provide a query object as the first argument to customize the response. * * @param {object=} query - Query object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: false. * @param {string=} query.pageToken - Page token. * @param {number=} query.pageSize - Max number of results to return. * @param {function} callback - The callback function. @@ -129,6 +131,16 @@ function PubSub(options) { * }, function(err, topics, nextQuery, apiResponse) {}); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * pubsub.getTopics({ + * autoPaginate: true + * }, function(err, topics) { + * // Called after all topics have been retrieved. + * }); + * + * //- * // Get the topics as a readable object stream. * //- * pubsub.getTopics() @@ -407,6 +419,8 @@ PubSub.prototype.topic = function(name, options) { * To get subscriptions for a topic, see {module:pubsub/topic}. * * @param {object=} options - Configuration object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: false. * @param {string|module:pubsub/topic} options.topic - The name of the topic to * list subscriptions from. * @param {number} options.pageSize - Maximum number of results to return. @@ -422,8 +436,10 @@ PubSub.prototype.topic = function(name, options) { * * @example * var callback = function(err, subscriptions, nextQuery, apiResponse) { - * // If `nextQuery` is non-null, there may be more results to fetch. To do - * // so, run `pubsub.getSubscriptions(nextQuery, callback);`. + * if (nextQuery) { + * // More results exist. + * pubsub.getSubscriptions(nextQuery, callback); + * } * }; * * //- @@ -439,6 +455,16 @@ PubSub.prototype.topic = function(name, options) { * }, callback); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * pubsub.getSubscriptions({ + * autoPaginate: true + * }, function(err, subscriptions) { + * // Called after all subscriptions have been retrieved. + * }); + * + * //- * // Get the subscriptions as a readable object stream. * //- * pubsub.getSubscriptions() diff --git a/lib/search/index-class.js b/lib/search/index-class.js index 90cd318df2f..b261d0c70fb 100644 --- a/lib/search/index-class.js +++ b/lib/search/index-class.js @@ -148,6 +148,8 @@ Index.prototype.document = function(id) { * Get {module:search/document} objects for all of the documents in this index. * * @param {object=} query - Query object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: false. * @param {string} query.pageSize - The maximum number of documents to return * per page. If not specified, 100 documents are returned per page. * @param {string} query.pageToken - A previously-returned page token @@ -180,6 +182,16 @@ Index.prototype.document = function(id) { * }, onApiResponse); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * index.getDocuments({ + * autoPaginate: true + * }, function(err, documents) { + * // Called after all documents have been retrieved. + * }); + * + * //- * // Get the documents as a readable object stream. * //- * index.getDocuments() @@ -238,6 +250,8 @@ Index.prototype.getDocuments = function(query, callback) { * @throws {error} If a query string or object isn't provided. * * @param {string|object} query - A query object or simply a string query. + * @param {boolean} query.autoPaginate - Have pagination handled automatically. + * Default: false. * @param {string} query.pageSize - The maximum number of documents to return * per page. If not specified, 100 documents are returned per page. * @param {string} query.pageToken - A previously-returned page token @@ -278,6 +292,19 @@ Index.prototype.getDocuments = function(query, callback) { * index.search(query, onApiResponse); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * var query = { + * query: 'person:stephen', + * autoPaginate: true + * }; + * + * index.search(query, function(err, indexes) { + * // Called after all indexes have been retrieved. + * }); + * + * //- * // Get the documents that match your query as a readable object stream. * //- * index.search('person:stephen') diff --git a/lib/search/index.js b/lib/search/index.js index 0e59745b256..56ea03107f2 100644 --- a/lib/search/index.js +++ b/lib/search/index.js @@ -96,6 +96,8 @@ function Search(options) { * Get {module:search/index} objects for all of the indexes in your project. * * @param {object=} query - Query object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: false. * @param {string} query.pageSize - The maximum number of indexes to return per * page. If not specified, 100 indexes are returned per page. * @param {string} query.pageToken - A previously-returned page token @@ -128,6 +130,16 @@ function Search(options) { * }, onApiResponse); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * search.getIndexes({ + * autoPaginate: true + * }, function(err, indexes) { + * // Called after all indexes have been retrieved. + * }); + * + * //- * // Get the indexes as a readable object stream. * //- * search.getIndexes() diff --git a/lib/storage/bucket.js b/lib/storage/bucket.js index 69ecf1d69e6..823e20da651 100644 --- a/lib/storage/bucket.js +++ b/lib/storage/bucket.js @@ -455,6 +455,8 @@ Bucket.prototype.file = function(name, options) { * Get File objects for the files currently in the bucket. * * @param {object=} query - Query object. + * @param {boolean} query.autoPaginate - Have pagination handled automatically. + * Default: false. * @param {string} query.delimiter - Results will contain only objects whose * names, aside from the prefix, do not contain delimiter. Objects whose * names, aside from the prefix, contain delimiter will have their name @@ -504,6 +506,16 @@ Bucket.prototype.file = function(name, options) { * }); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * bucket.getFiles({ + * autoPaginate: true + * }, function(err, files) { + * // Called after all files have been retrieved. + * }); + * + * //- * // Get the files from your bucket as a readable object stream. * //- * bucket.getFiles() diff --git a/lib/storage/index.js b/lib/storage/index.js index 2100e231121..6ba1bdc1246 100644 --- a/lib/storage/index.js +++ b/lib/storage/index.js @@ -249,6 +249,8 @@ Storage.prototype.createBucket = function(name, metadata, callback) { * Get Bucket objects for all of the buckets in your project. * * @param {object=} query - Query object. + * @param {boolean} options.autoPaginate - Have pagination handled + * automatically. Default: false. * @param {number} query.maxResults - Maximum number of items plus prefixes to * return. * @param {string} query.pageToken - A previously-returned page token @@ -280,6 +282,16 @@ Storage.prototype.createBucket = function(name, metadata, callback) { * }, function(err, buckets, nextQuery, apiResponse) {}); * * //- + * // To have pagination handled for you, set `autoPaginate`. Note the changed + * // callback parameters. + * //- + * gcs.getBuckets({ + * autoPaginate: true + * }, function(err, buckets) { + * // Called after all buckets have been retrieved. + * }); + * + * //- * // Get the buckets from your project as a readable object stream. * //- * gcs.getBuckets() diff --git a/package.json b/package.json index af9980bed48..26a1246d99c 100644 --- a/package.json +++ b/package.json @@ -50,16 +50,17 @@ "dependencies": { "async": "^0.9.0", "buffer-equal": "0.0.1", + "concat-stream": "^1.5.0", "configstore": "^0.3.2", "duplexify": "^3.2.0", "extend": "^2.0.0", - "sse4_crc32": "^3.1.0", "google-auth-library": "^0.9.4", "mime-types": "^2.0.8", "node-uuid": "^1.4.2", "once": "^1.3.1", "protobufjs": "^3.8.2", "request": "^2.53.0", + "sse4_crc32": "^3.1.0", "stream-events": "^1.0.1", "through2": "^0.6.3" }, diff --git a/system-test/bigquery.js b/system-test/bigquery.js index 7e2a4e2fcf4..3b70c172596 100644 --- a/system-test/bigquery.js +++ b/system-test/bigquery.js @@ -138,6 +138,14 @@ describe('BigQuery', function() { }); }); + it('should list datasets with autoPaginate', function(done) { + bigquery.getDatasets({ autoPaginate: true }, function(err, datasets) { + assert(datasets.length > 0); + assert(datasets[0] instanceof Dataset); + done(); + }); + }); + it('should list datasets as a stream', function(done) { var datasetEmitted = false; @@ -165,6 +173,25 @@ describe('BigQuery', function() { }); }); + it('should get query results as a stream', function(done) { + bigquery.startQuery(query, function(err, job) { + assert.ifError(err); + + var rowsEmitted = []; + + job.getQueryResults() + .on('error', done) + .on('data', function(row) { + rowsEmitted.push(row); + }) + .on('end', function() { + assert.equal(rowsEmitted.length, 100); + assert.equal(typeof rowsEmitted[0].url, 'string'); + done(); + }); + }); + }); + it('should query as a stream', function(done) { var rowsEmitted = 0; @@ -192,6 +219,17 @@ describe('BigQuery', function() { }); }); + it('should query with autoPaginate', function(done) { + bigquery.query({ + query: query, + autoPaginate: true + }, function(err, rows) { + assert.ifError(err); + assert.equal(rows.length, 100); + done(); + }); + }); + it('should get a list of jobs', function(done) { bigquery.getJobs(function(err, jobs) { assert.ifError(err); @@ -200,6 +238,14 @@ describe('BigQuery', function() { }); }); + it('should list jobs with autoPaginate', function(done) { + bigquery.getJobs({ autoPaginate: true }, function(err, jobs) { + assert.ifError(err); + assert(jobs[0] instanceof Job); + done(); + }); + }); + it('should list jobs as a stream', function(done) { var jobEmitted = false; @@ -252,6 +298,14 @@ describe('BigQuery', function() { }); }); + it('should get the rows in a table with autoPaginate', function(done) { + table.getRows({ autoPaginate: true }, function(err, rows) { + assert.ifError(err); + assert(Array.isArray(rows)); + done(); + }); + }); + it('should get the rows in a table via stream', function(done) { table.getRows() .on('error', done) diff --git a/system-test/datastore.js b/system-test/datastore.js index 48853bb3fbd..33b6a5c3ea1 100644 --- a/system-test/datastore.js +++ b/system-test/datastore.js @@ -299,6 +299,33 @@ describe('datastore', function() { }); }); + it('should run query with autoPaginate', function(done) { + var q = ds.createQuery('Character') + .hasAncestor(ancestor) + .autoPaginate(); + + ds.runQuery(q, function(err, results) { + assert.ifError(err); + assert.equal(results.length, characters.length); + done(); + }); + }); + + it('should not go over a limit with autoPaginate', function(done) { + var limit = 3; + + var q = ds.createQuery('Character') + .hasAncestor(ancestor) + .limit(limit) + .autoPaginate(); + + ds.runQuery(q, function(err, results) { + assert.ifError(err); + assert.equal(results.length, limit); + done(); + }); + }); + it('should run a query as a stream', function(done) { var q = ds.createQuery('Character').hasAncestor(ancestor); diff --git a/system-test/search.js b/system-test/search.js index c5e517530aa..6b8035b8464 100644 --- a/system-test/search.js +++ b/system-test/search.js @@ -141,6 +141,14 @@ describe('Search', function() { }); }); + it('should get all indexes with autoPaginate', function(done) { + search.getIndexes({ autoPaginate: true }, function(err, indexes) { + assert.ifError(err); + assert(indexes.length > 0); + done(); + }); + }); + it('should get all indexes in stream mode', function(done) { var resultsMatched = 0; @@ -189,6 +197,14 @@ describe('Search', function() { }); }); + it('should get all documents with autoPaginate', function(done) { + index.getDocuments({ autoPaginate: true }, function(err, documents) { + assert.ifError(err); + assert.strictEqual(documents.length, 1); + done(); + }); + }); + it('should get all documents in stream mode', function(done) { var resultsMatched = 0; @@ -285,6 +301,18 @@ describe('Search', function() { }); }); + it('should search document with autoPaginate', function(done) { + index.search({ + query: query, + autoPaginate: true + }, function(err, results) { + assert.ifError(err); + assert.equal(results.length, 1); + assert.equal(results[0].id, DOCUMENT_NAME); + done(); + }); + }); + it('should search document in stream mode', function(done) { var results = []; diff --git a/system-test/storage.js b/system-test/storage.js index 67aab6d66d6..c46720bd05c 100644 --- a/system-test/storage.js +++ b/system-test/storage.js @@ -422,6 +422,18 @@ describe('storage', function() { } }); + it('should get buckets with autoPaginate', function(done) { + storage.getBuckets({ + autoPaginate: true + }, function(err, buckets) { + assert.ifError(err); + + assert(buckets.length > 0); + assert(buckets[0] instanceof Bucket); + done(); + }); + }); + it('should get buckets as a stream', function(done) { var bucketEmitted = false; @@ -715,6 +727,15 @@ describe('storage', function() { }); }); + it('should get files with autoPaginate', function(done) { + bucket.getFiles({ autoPaginate: true }, function(err, files) { + assert.ifError(err); + assert.strictEqual(files.length, filenames.length); + assert(files[0] instanceof File); + done(); + }); + }); + it('should get files as a stream', function(done) { var fileEmitted = false; @@ -793,7 +814,6 @@ describe('storage', function() { }); }); }); - }); it('should get all files scoped to their version', function(done) { diff --git a/test/common/stream-router.js b/test/common/stream-router.js index 86ef7f7530d..003537f0745 100644 --- a/test/common/stream-router.js +++ b/test/common/stream-router.js @@ -19,6 +19,7 @@ var assert = require('assert'); var extend = require('extend'); var mockery = require('mockery'); +var through = require('through2'); var util = require('../../lib/common/util.js'); var uuid = require('node-uuid'); @@ -87,17 +88,34 @@ describe('streamRouter', function() { assert.notEqual(anotherMethod, FakeClass.prototype.anotherMethodToExtend); }); + it('should parse the arguments', function(done) { + streamRouterOverrides.parseArguments_ = function(args) { + assert.deepEqual([].slice.call(args), [1, 2, 3]); + done(); + }; + + streamRouterOverrides.router_ = util.noop; + + streamRouter.extend(FakeClass, 'methodToExtend'); + FakeClass.prototype.methodToExtend(1, 2, 3); + }); + it('should call router when the original method is called', function(done) { var expectedReturnValue = FakeClass.prototype.methodToExtend(); + var parsedArguments = { a: 'b', c: 'd' }; + + streamRouterOverrides.parseArguments_ = function() { + return parsedArguments; + }; streamRouterOverrides.router_ = function(args, originalMethod) { - assert.deepEqual([].slice.call(args), [1, 2, 3]); + assert.strictEqual(args, parsedArguments); assert.equal(originalMethod(), expectedReturnValue); done(); }; streamRouter.extend(FakeClass, 'methodToExtend'); - FakeClass.prototype.methodToExtend(1, 2, 3); + FakeClass.prototype.methodToExtend(); }); it('should maintain `this` context', function(done) { @@ -126,55 +144,251 @@ describe('streamRouter', function() { }); }); + describe('parseArguments_', function() { + it('should detect a callback if first argument is a function', function() { + var args = [ util.noop ]; + var parsedArguments = streamRouter.parseArguments_(args); + + assert.strictEqual(parsedArguments.callback, args[0]); + }); + + it('should use any other first argument as query', function() { + var args = [ 'string' ]; + var parsedArguments = streamRouter.parseArguments_(args); + + assert.strictEqual(parsedArguments.query, args[0]); + }); + + it('should detect a callback if last argument is a function', function() { + var args = [ 'string', util.noop ]; + var parsedArguments = streamRouter.parseArguments_(args); + + assert.strictEqual(parsedArguments.callback, args[1]); + }); + + it('should not assign a callback if a fn is not provided', function() { + var args = [ 'string' ]; + var parsedArguments = streamRouter.parseArguments_(args); + + assert.strictEqual(parsedArguments.callback, undefined); + }); + }); + describe('router_', function() { - var ARGS_WITHOUT_CALLBACK = [1, 2, 3]; - var ARGS_WITH_CALLBACK = ARGS_WITHOUT_CALLBACK.concat(util.noop); + beforeEach(function() { + streamRouterOverrides.runAsStream_ = util.noop; + }); - describe('stream mode', function() { - it('should call original method when stream opens', function(done) { - function originalMethod() { - var args = arguments; + describe('callback mode', function() { + describe('autoPaginate', function() { + it('should recognize autoPaginate', function(done) { + var parsedArguments = { + query: { + autoPaginate: true + }, + callback: util.noop + }; + + streamRouterOverrides.runAsStream_ = function() { + done(); + return through(); + }; + + streamRouter.router_(parsedArguments, util.noop); + }); + + it('should recognize autoPaginateVal', function(done) { + var parsedArguments = { + query: { + autoPaginateVal: true + }, + callback: util.noop + }; + + streamRouterOverrides.runAsStream_ = function() { + done(); + return through(); + }; - ARGS_WITHOUT_CALLBACK.forEach(function(arg, index) { - assert.strictEqual(args[index], arg); + streamRouter.router_(parsedArguments, util.noop); + }); + + it('should delete the autoPaginate property', function(done) { + var parsedArguments = { + query: { + autoPaginate: true + }, + callback: util.noop + }; + + streamRouterOverrides.runAsStream_ = function(query) { + assert.strictEqual(query.autoPaginate, undefined); + done(); + return through(); + }; + + streamRouter.router_(parsedArguments, util.noop); + }); + + it('should delete the autoPaginateVal property', function(done) { + var parsedArguments = { + query: { + autoPaginateVal: true + }, + callback: util.noop + }; + + streamRouterOverrides.runAsStream_ = function(query) { + assert.strictEqual(query.autoPaginateVal, undefined); + done(); + return through(); + }; + + streamRouter.router_(parsedArguments, util.noop); + }); + + it('should runAsStream', function(done) { + var parsedArguments = { + query: { + autoPaginate: true + }, + callback: util.noop + }; + + streamRouterOverrides.runAsStream_ = function(query, originalMethod) { + assert.deepEqual(query, {}); + originalMethod(); + return through(); + }; + + streamRouter.router_(parsedArguments, done); + }); + + it('should execute callback on error', function(done) { + var error = new Error('Error.'); + + var parsedArguments = { + query: { + autoPaginate: true + }, + callback: function(err) { + assert.strictEqual(err, error); + done(); + } + }; + + streamRouterOverrides.runAsStream_ = function() { + var stream = through(); + setImmediate(function() { + stream.emit('error', error); + }); + return stream; + }; + + streamRouter.router_(parsedArguments, util.noop); + }); + + it('should return all results on end', function(done) { + var results = ['a', 'b', 'c']; + + var parsedArguments = { + query: { + autoPaginate: true + }, + callback: function(err, results_) { + assert.deepEqual(results_.toString().split(''), results); + done(); + } + }; + + streamRouterOverrides.runAsStream_ = function() { + var stream = through(); + + setImmediate(function() { + results.forEach(function(result) { + stream.push(result); + }); + + stream.push(null); + }); + + return stream; + }; + + streamRouter.router_(parsedArguments, util.noop); + }); + }); + + describe('manual pagination', function() { + it('should call original method', function(done) { + var parsedArguments = { + query: { a: 'b', c: 'd' }, + callback: done + }; + + streamRouter.router_(parsedArguments, function(query, callback) { + assert.deepEqual(query, parsedArguments.query); + callback(); }); + }); + }); + }); - // The callback should have been appended to the original arguments. - assert.strictEqual(args.length, ARGS_WITHOUT_CALLBACK.length + 1); + describe('stream mode', function() { + it('should call runAsStream_', function(done) { + var parsedArguments = { + query: { a: 'b', c: 'd' } + }; - done(); - } + streamRouterOverrides.runAsStream_ = function(query, originalMethod) { + assert.deepEqual(query, parsedArguments.query); + originalMethod(); + }; - var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod); - rs.on('data', util.noop); // Trigger the underlying `_read` event. + streamRouter.router_(parsedArguments, done); }); - it('should replace last argument if it is undefined', function(done) { - var argsWithHangingUndefined = ARGS_WITHOUT_CALLBACK.concat(undefined); + it('should return the value of runAsStream_', function() { + var parsedArguments = { + query: { a: 'b', c: 'd' } + }; - function originalMethod() { - // If the last argument was replaced, the arguments array length will - // not have increased. - assert.strictEqual(arguments.length, argsWithHangingUndefined.length); + var stream = through(); + streamRouterOverrides.runAsStream_ = function() { + return stream; + }; + + var stream_ = streamRouter.router_(parsedArguments, assert.ifError); + assert.strictEqual(stream_, stream); + }); + }); + }); + + describe('runAsStream_', function() { + describe('stream mode', function() { + var QUERY = { a: 'b', c: 'd' }; + + it('should call original method when stream opens', function(done) { + function originalMethod(query) { + assert.strictEqual(query, QUERY); done(); } - var rs = streamRouter.router_(argsWithHangingUndefined, originalMethod); + var rs = streamRouter.runAsStream_(QUERY, originalMethod); rs.on('data', util.noop); // Trigger the underlying `_read` event. }); it('should emit an error if one occurs', function(done) { var error = new Error('Error.'); - function originalMethod() { - var callback = [].slice.call(arguments).pop(); + function originalMethod(query, callback) { setImmediate(function() { callback(error); }); } - var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod); + var rs = streamRouter.runAsStream_(QUERY, originalMethod); rs.on('data', util.noop); // Trigger the underlying `_read` event. rs.on('error', function(err) { assert.deepEqual(err, error); @@ -186,14 +400,13 @@ describe('streamRouter', function() { var results = ['a', 'b', 'c']; var resultsReceived = []; - function originalMethod() { - var callback = [].slice.call(arguments).pop(); + function originalMethod(query, callback) { setImmediate(function() { callback(null, results); }); } - var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod); + var rs = streamRouter.runAsStream_(QUERY, originalMethod); rs.on('data', function(result) { resultsReceived.push(result); }); @@ -206,8 +419,7 @@ describe('streamRouter', function() { describe('limits', function() { var limit = 1; - function originalMethod() { - var callback = [].slice.call(arguments).pop(); + function originalMethod(query, callback) { setImmediate(function() { callback(null, [1, 2, 3]); }); @@ -216,7 +428,7 @@ describe('streamRouter', function() { it('should respect query.maxResults', function(done) { var numResultsReceived = 0; - streamRouter.router_([{ maxResults: limit }], originalMethod) + streamRouter.runAsStream_({ maxResults: limit }, originalMethod) .on('data', function() { numResultsReceived++; }) .on('end', function() { assert.strictEqual(numResultsReceived, limit); @@ -227,7 +439,7 @@ describe('streamRouter', function() { it('should respect query.limitVal', function(done) { var numResultsReceived = 0; - streamRouter.router_([{ limitVal: limit }], originalMethod) + streamRouter.runAsStream_({ limitVal: limit }, originalMethod) .on('data', function() { numResultsReceived++; }) .on('end', function() { assert.strictEqual(numResultsReceived, limit); @@ -240,10 +452,7 @@ describe('streamRouter', function() { var nextQuery = { a: 'b', c: 'd' }; var nextQuerySent = false; - function originalMethod() { - var query = arguments[0]; - var callback = [].slice.call(arguments).pop(); - + function originalMethod(query, callback) { if (nextQuerySent) { assert.deepEqual(query, nextQuery); done(); @@ -256,21 +465,20 @@ describe('streamRouter', function() { }); } - var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod); + var rs = streamRouter.runAsStream_(QUERY, originalMethod); rs.on('data', util.noop); // Trigger the underlying `_read` event. }); it('should not push more results if stream ends early', function(done) { var results = ['a', 'b', 'c']; - function originalMethod() { - var callback = [].slice.call(arguments).pop(); + function originalMethod(query, callback) { setImmediate(function() { callback(null, results); }); } - var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod); + var rs = streamRouter.runAsStream_(QUERY, originalMethod); rs.on('data', function(result) { if (result === 'b') { // Pre-maturely end the stream. @@ -289,17 +497,15 @@ describe('streamRouter', function() { var originalMethodCalledCount = 0; - function originalMethod() { + function originalMethod(query, callback) { originalMethodCalledCount++; - var callback = [].slice.call(arguments).pop(); - setImmediate(function() { callback(null, results, {}); }); } - var rs = streamRouter.router_(ARGS_WITHOUT_CALLBACK, originalMethod); + var rs = streamRouter.runAsStream_(QUERY, originalMethod); rs.on('data', function(result) { if (result === 'b') { // Pre-maturely end the stream. @@ -312,16 +518,5 @@ describe('streamRouter', function() { }); }); }); - - describe('callback mode', function() { - it('should call original method', function(done) { - function originalMethod() { - assert.deepEqual([].slice.call(arguments), ARGS_WITH_CALLBACK); - done(); - } - - streamRouter.router_(ARGS_WITH_CALLBACK, originalMethod); - }); - }); }); }); diff --git a/test/datastore/query.js b/test/datastore/query.js index 6dd3079bd39..a4d8ddd51d9 100644 --- a/test/datastore/query.js +++ b/test/datastore/query.js @@ -43,6 +43,17 @@ describe('Query', function() { assert.equal(query.namespace, 'ns'); }); + it('should default autoPaginate to false', function() { + var query = new Query(['kind1']); + assert.strictEqual(query.autoPaginateVal, false); + }); + + it('should support setting autoPagination', function() { + var query = new Query(['kind1']) + .autoPaginate(); + assert.strictEqual(query.autoPaginateVal, true); + }); + it('should support field selection by field name', function() { var query = new Query(['kind1']) .select(['name', 'title']); From 3c379dfbe8ee5274ccf1162fc52bac7724cc570d Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Tue, 14 Jul 2015 00:13:16 -0400 Subject: [PATCH 9/9] streamrouter: default `autoPaginate: true` --- lib/bigquery/index.js | 75 ++++++++------- lib/bigquery/job.js | 2 +- lib/bigquery/table.js | 26 +++-- lib/common/stream-router.js | 76 +++++++++------ lib/common/util.js | 30 +++++- lib/datastore/query.js | 2 +- lib/datastore/request.js | 25 ++--- lib/pubsub/index.js | 28 +++--- lib/search/index-class.js | 82 +++++++--------- lib/search/index.js | 29 +++--- lib/storage/bucket.js | 51 +++++----- lib/storage/index.js | 36 ++++--- system-test/bigquery.js | 26 +++-- system-test/datastore.js | 15 +-- system-test/pubsub.js | 8 +- system-test/search.js | 52 ++-------- system-test/storage.js | 33 ++----- test/common/stream-router.js | 178 ++++++++++++++++------------------- test/common/util.js | 83 +++++++++++++--- test/datastore/query.js | 10 +- 20 files changed, 439 insertions(+), 428 deletions(-) diff --git a/lib/bigquery/index.js b/lib/bigquery/index.js index 462552d91f6..3a128efd090 100644 --- a/lib/bigquery/index.js +++ b/lib/bigquery/index.js @@ -163,13 +163,23 @@ BigQuery.prototype.dataset = function(id) { * @param {object=} query - Configuration object. * @param {boolean} query.all - List all datasets, including hidden ones. * @param {boolean} query.autoPaginate - Have pagination handled automatically. - * Default: false. + * Default: true. * @param {number} query.maxResults - Maximum number of results to return. * @param {string} query.pageToken - Token returned from a previous call, to * request the next page of results. * @param {function} callback - The callback function. * * @example + * bigquery.getDatasets(function(err, datasets) { + * if (!err) { + * // datasets is an array of Dataset objects. + * } + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- * var callback = function(err, datasets, nextQuery, apiResponse) { * if (nextQuery) { * // More results exist. @@ -177,17 +187,9 @@ BigQuery.prototype.dataset = function(id) { * } * }; * - * bigquery.getDatasets(callback); - * - * //- - * // To have pagination handled for you, set `autoPaginate`. Note the changed - * // callback parameters. - * //- * bigquery.getDatasets({ - * autoPaginate: true - * }, function(err, datasets) { - * // Called after all datasets have been retrieved. - * }); + * autoPaginate: false + * }, callback); * * //- * // Get the datasets from your project as a readable object stream. @@ -251,7 +253,7 @@ BigQuery.prototype.getDatasets = function(query, callback) { * @param {boolean=} options.allUsers - Display jobs owned by all users in the * project. * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: false. + * automatically. Default: true. * @param {number=} options.maxResults - Maximum number of results to return. * @param {string=} options.pageToken - Token returned from a previous call, to * request the next page of results. @@ -263,19 +265,27 @@ BigQuery.prototype.getDatasets = function(query, callback) { * @param {function} callback - The callback function. * * @example - * bigquery.getJobs(function(err, jobs, nextQuery, apiResponse) { - * // If `nextQuery` is non-null, there are more results to fetch. + * bigquery.getJobs(function(err, jobs) { + * if (!err) { + * // jobs is an array of Job objects. + * } * }); * * //- - * // To have pagination handled for you, set `autoPaginate`. Note the changed - * // callback parameters. + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. * //- + * var callback = function(err, jobs, nextQuery, apiRespose) { + * if (nextQuery) { + * // More results exist. + * bigquery.getJobs(nextQuery, callback); + * } + * }; + * * bigquery.getJobs({ - * autoPaginate: true - * }, function(err, jobs) { - * // Called after all jobs have been retrieved. - * }); + * autoPaginate: false + * }, callback); + * * //- * // Get the jobs from your project as a readable object stream. * //- @@ -360,7 +370,7 @@ BigQuery.prototype.job = function(id) { * * @param {string|object} options - A string SQL query or configuration object. * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: false. + * automatically. Default: true. * @param {number} options.maxResults - Maximum number of results to read. * @param {string} options.query - A query string, following the BigQuery query * syntax, of the query to execute. @@ -373,29 +383,26 @@ BigQuery.prototype.job = function(id) { * @example * var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100'; * + * bigquery.query(query, function(err, rows) { + * if (!err) { + * // Handle results here. + * } + * }); + * * //- - * // You can run a query against your data in a serial manner. + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. * //- * var callback = function(err, rows, nextQuery, apiResponse) { - * // Handle results here. - * * if (nextQuery) { * bigquery.query(nextQuery, callback); * } * }; * - * bigquery.query(query, callback); - * - * //- - * // To have pagination handled for you, set `autoPaginate`. Note the changed - * // callback parameters. - * //- * bigquery.query({ * query: query, - * autoPaginate: true - * }, function(err, rows) { - * // Called after all rows have been retrieved. - * }); + * autoPaginate: false + * }, callback); * * //- * // You can also use the `query` method as a readable object stream by diff --git a/lib/bigquery/job.js b/lib/bigquery/job.js index 2d27c8964bd..881f786951b 100644 --- a/lib/bigquery/job.js +++ b/lib/bigquery/job.js @@ -92,7 +92,7 @@ Job.prototype.getMetadata = function(callback) { * * @param {object=} options - Configuration object. * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: false. + * automatically. Default: true. * @param {number} options.maxResults - Maximum number of results to read. * @param {string} options.pageToken - Page token, returned by a previous call, * to request the next page of results. Note: This is automatically added to diff --git a/lib/bigquery/table.js b/lib/bigquery/table.js index 94a53923391..7e400c8cd1e 100644 --- a/lib/bigquery/table.js +++ b/lib/bigquery/table.js @@ -484,15 +484,21 @@ Table.prototype.getMetadata = function(callback) { * * @param {object=} options - The configuration object. * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: false. + * automatically. Default: true. * @param {number} options.maxResults - Maximum number of results to return. * @param {function} callback - The callback function. * * @example - * var options = { - * maxResults: 100 - * }; + * table.getRows(function(err, rows) { + * if (!err) { + * // Handle results here. + * } + * }); * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- * var callback = function(err, rows, nextQuery, apiResponse) { * if (nextQuery) { * // More results exist. @@ -500,17 +506,9 @@ Table.prototype.getMetadata = function(callback) { * } * }; * - * table.getRows(options, callback); - * - * //- - * // To have pagination handled for you, set `autoPaginate`. Note the changed - * // callback parameters. - * //- * table.getRows({ - * autoPaginate: true - * }, function(err, rows) { - * // Called after all rows have been retrieved. - * }); + * autoPaginate: false + * }, callback); * * //- * // Get the rows as a readable object stream. diff --git a/lib/common/stream-router.js b/lib/common/stream-router.js index 51dde04b9f3..f4d2708fbde 100644 --- a/lib/common/stream-router.js +++ b/lib/common/stream-router.js @@ -74,22 +74,51 @@ streamRouter.extend = function(Class, methodNames) { * method received. */ streamRouter.parseArguments_ = function(args) { - var parsedArguments = {}; + var query; + var callback; + var maxResults = -1; + var autoPaginate = true; var firstArgument = args[0]; var lastArgument = args[args.length - 1]; if (util.is(firstArgument, 'function')) { - parsedArguments.callback = firstArgument; + callback = firstArgument; } else { - parsedArguments.query = firstArgument; + query = firstArgument; } if (util.is(lastArgument, 'function')) { - parsedArguments.callback = lastArgument; + callback = lastArgument; } - return parsedArguments; + if (util.is(query, 'object')) { + // Check if the user only asked for a certain amount of results. + if (util.is(query.maxResults, 'number')) { + // `maxResults` is used API-wide. + maxResults = query.maxResults; + } else if (util.is(query.limitVal, 'number')) { + // `limitVal` is part of a Datastore query. + maxResults = query.limitVal; + } else if (util.is(query.pageSize, 'number')) { + // `pageSize` is Pub/Sub's `maxResults`. + maxResults = query.pageSize; + } + + if (callback && + (maxResults !== -1 || // The user specified a limit. + query.autoPaginate === false || + query.autoPaginateVal === false)) { + autoPaginate = false; + } + } + + return { + query: query || {}, + callback: callback, + maxResults: maxResults, + autoPaginate: autoPaginate + }; }; /** @@ -103,20 +132,20 @@ streamRouter.parseArguments_ = function(args) { * commonly an object, but to make the API more simple, it can also be a * string in some places. * @param {function=} parsedArguments.callback - Callback function. + * @param {boolean} parsedArguments.autoPaginate - Auto-pagination enabled. + * @param {number} parsedArguments.maxResults - Maximum results to return. * @param {function} originalMethod - The cached method that accepts a callback * and returns `nextQuery` to receive more results. * @return {undefined|stream} */ streamRouter.router_ = function(parsedArguments, originalMethod) { - var query = parsedArguments.query || {}; + var query = parsedArguments.query; var callback = parsedArguments.callback; + var autoPaginate = parsedArguments.autoPaginate; if (callback) { - if (query.autoPaginate === true || query.autoPaginateVal === true) { - delete query.autoPaginate; - delete query.autoPaginateVal; - - this.runAsStream_(query, originalMethod) + if (autoPaginate) { + this.runAsStream_(parsedArguments, originalMethod) .on('error', callback) .pipe(concat(function(results) { callback(null, results); @@ -125,7 +154,7 @@ streamRouter.router_ = function(parsedArguments, originalMethod) { originalMethod(query, callback); } } else { - return this.runAsStream_(query, originalMethod); + return this.runAsStream_(parsedArguments, originalMethod); } }; @@ -136,26 +165,19 @@ streamRouter.router_ = function(parsedArguments, originalMethod) { * `maxResults` and `limitVal` (from Datastore) will act as a cap for how many * results are fetched and emitted to the stream. * - * @param {object=|string=} query - Query object. This is most + * @param {object=|string=} parsedArguments.query - Query object. This is most * commonly an object, but to make the API more simple, it can also be a * string in some places. + * @param {function=} parsedArguments.callback - Callback function. + * @param {boolean} parsedArguments.autoPaginate - Auto-pagination enabled. + * @param {number} parsedArguments.maxResults - Maximum results to return. * @param {function} originalMethod - The cached method that accepts a callback * and returns `nextQuery` to receive more results. * @return {stream} - Readable object stream. */ -streamRouter.runAsStream_ = function(query, originalMethod) { - query = query || {}; - - var resultsToSend = -1; - - // Check if the user only asked for a certain amount of results. - if (util.is(query.maxResults, 'number')) { - // `maxResults` is used API-wide. - resultsToSend = query.maxResults; - } else if (util.is(query.limitVal, 'number')) { - // `limitVal` is part of a Datastore query. - resultsToSend = query.limitVal; - } +streamRouter.runAsStream_ = function(parsedArguments, originalMethod) { + var query = parsedArguments.query; + var resultsToSend = parsedArguments.maxResults; var stream = streamEvents(through.obj()); @@ -201,7 +223,7 @@ streamRouter.runAsStream_ = function(query, originalMethod) { } stream.once('reading', function() { - originalMethod.call(null, query, onResultSet); + originalMethod(query, onResultSet); }); return stream; diff --git a/lib/common/util.js b/lib/common/util.js index e58e6113758..6c89bea50f7 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -674,13 +674,12 @@ function makeRequest(reqOpts, config, callback) { config = config || {}; + reqOpts = util.decorateRequest(reqOpts); + var MAX_RETRIES = config.maxRetries || 3; var autoRetry = config.autoRetry !== false ? true : false; var attemptedRetries = 0; - reqOpts.headers = reqOpts.headers || {}; - reqOpts.headers['User-Agent'] = USER_AGENT; - function shouldRetry(err) { return autoRetry && MAX_RETRIES > attemptedRetries && @@ -704,3 +703,28 @@ function makeRequest(reqOpts, config, callback) { } util.makeRequest = makeRequest; + +/** + * Decorate the options about to be made in a request. + * + * @param {object} reqOpts - The options to be passed to `request`. + * @return {object} reqOpts - The decorated reqOpts. + */ +function decorateRequest(reqOpts) { + reqOpts.headers = reqOpts.headers || {}; + reqOpts.headers['User-Agent'] = USER_AGENT; + + if (util.is(reqOpts.qs, 'object')) { + delete reqOpts.qs.autoPaginate; + delete reqOpts.qs.autoPaginateVal; + } + + if (util.is(reqOpts.json, 'object')) { + delete reqOpts.json.autoPaginate; + delete reqOpts.json.autoPaginateVal; + } + + return reqOpts; +} + +util.decorateRequest = decorateRequest; diff --git a/lib/datastore/query.js b/lib/datastore/query.js index 8c502c2dee3..0015be8770d 100644 --- a/lib/datastore/query.js +++ b/lib/datastore/query.js @@ -73,7 +73,7 @@ function Query(namespace, kinds) { this.selectVal = []; // pagination - this.autoPaginateVal = false; + this.autoPaginateVal = true; this.startVal = null; this.endVal = null; this.limitVal = -1; diff --git a/lib/datastore/request.js b/lib/datastore/request.js index 929568973a7..84d1d4e98a7 100644 --- a/lib/datastore/request.js +++ b/lib/datastore/request.js @@ -493,6 +493,18 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * //- * var query = dataset.createQuery('Lion'); * + * transaction.runQuery(query, function(err, entities) { + * if (!err) { + * // Handle entities here. + * } + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, call `autoPaginate(false)` on your query. + * //- + * var manualPageQuery = dataset.createQuery('Lion').autoPaginate(false); + * * var callback = function(err, entities, nextQuery, apiResponse) { * if (nextQuery) { * // More results might exist. @@ -500,18 +512,7 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * } * }; * - * transaction.runQuery(query, callback); - * - * //- - * // To have pagination handled for you, call `autoPaginate()`. Note the - * // changed callback parameters. - * //- - * - * var queryWithAutoPagination = dataset.createQuery('Lion').autoPaginate(); - * - * transaction.runQuery(queryWithAutoPagination, function(err, entities) { - * // Called after all entities have been retrieved. - * }); + * transaction.runQuery(manualPageQuery, callback); * * //- * // If you omit the callback, runQuery will automatically call subsequent diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 75b4e526fb0..70ad17f8ed5 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -107,13 +107,13 @@ function PubSub(options) { * * @param {object=} query - Query object. * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: false. + * automatically. Default: true. * @param {string=} query.pageToken - Page token. * @param {number=} query.pageSize - Max number of results to return. * @param {function} callback - The callback function. * @param {?error} callback.err - An error from the API call, may be null. * @param {module:pubsub/topic[]} callback.topics - The list of topics returned. - * @param {object} callback.nextQuery - A query object representing the next + * @param {?object} callback.nextQuery - A query object representing the next * page of topics. * @param {object} callback.apiResponse - The full API response from the * service. @@ -420,7 +420,7 @@ PubSub.prototype.topic = function(name, options) { * * @param {object=} options - Configuration object. * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: false. + * automatically. Default: true. * @param {string|module:pubsub/topic} options.topic - The name of the topic to * list subscriptions from. * @param {number} options.pageSize - Maximum number of results to return. @@ -429,12 +429,22 @@ PubSub.prototype.topic = function(name, options) { * @param {?error} callback.err - An error from the API call, may be null. * @param {module:pubsub/subscription[]} callback.subscriptions - The list of * subscriptions returned. - * @param {object} callback.nextQuery - A query object representing the next + * @param {?object} callback.nextQuery - A query object representing the next * page of topics. * @param {object} callback.apiResponse - The full API response from the * service. * * @example + * pubsub.getSubscriptions(function(err, subscriptions) { + * if (!err) { + * // subscriptions is an array of Subscription objects. + * } + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- * var callback = function(err, subscriptions, nextQuery, apiResponse) { * if (nextQuery) { * // More results exist. @@ -442,16 +452,8 @@ PubSub.prototype.topic = function(name, options) { * } * }; * - * //- - * // Get all subscriptions for this project. - * //- - * pubsub.getSubscriptions(callback); - * - * //- - * // Customize the query. - * //- * pubsub.getSubscriptions({ - * pageSize: 3 + * autoPaginate: false, * }, callback); * * //- diff --git a/lib/search/index-class.js b/lib/search/index-class.js index b261d0c70fb..e5366d7b683 100644 --- a/lib/search/index-class.js +++ b/lib/search/index-class.js @@ -149,7 +149,7 @@ Index.prototype.document = function(id) { * * @param {object=} query - Query object. * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: false. + * automatically. Default: true. * @param {string} query.pageSize - The maximum number of documents to return * per page. If not specified, 100 documents are returned per page. * @param {string} query.pageToken - A previously-returned page token @@ -159,39 +159,34 @@ Index.prototype.document = function(id) { * @param {function} callback - The callback function. * * @example + * index.getDocuments(function(err, documents) { + * if (!err) { + * // documents is an array of Document objects. + * } + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- * function onApiResponse(err, documents, nextQuery, apiResponse) { * if (err) { * console.error(err); * return; * } * - * // `documents` is an array of Document objects in this index. + * // `documents` is an array of Document objects. * * if (nextQuery) { * index.getDocuments(nextQuery, onApiResponse); * } * } * - * index.getDocuments(onApiResponse); - * - * //- - * // Customize the request using a query object. - * //- * index.getDocuments({ - * pageSize: 10 + * autoPaginate: false * }, onApiResponse); * * //- - * // To have pagination handled for you, set `autoPaginate`. Note the changed - * // callback parameters. - * //- - * index.getDocuments({ - * autoPaginate: true - * }, function(err, documents) { - * // Called after all documents have been retrieved. - * }); - * - * //- * // Get the documents as a readable object stream. * //- * index.getDocuments() @@ -251,7 +246,7 @@ Index.prototype.getDocuments = function(query, callback) { * * @param {string|object} query - A query object or simply a string query. * @param {boolean} query.autoPaginate - Have pagination handled automatically. - * Default: false. + * Default: true. * @param {string} query.pageSize - The maximum number of documents to return * per page. If not specified, 100 documents are returned per page. * @param {string} query.pageToken - A previously-returned page token @@ -261,53 +256,40 @@ Index.prototype.getDocuments = function(query, callback) { * @param {function} callback - The callback function. * * @example + * var query = 'person:stephen'; + * + * index.search(query, function(err, documents) { + * if (!err) { + * // `documents` is an array of Document objects. + * } + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- * function onApiResponse(err, documents, nextQuery, apiResponse) { * if (err) { * console.error(err); * return; * } * - * // `documents` is an array of Document objects that matched your query. + * // `documents` is an array of Document objects. * * if (nextQuery) { * index.search(nextQuery, onApiResponse); * } * } * - * //- - * // Run a simple query against all documents. - * //- - * var query = 'person:stephen'; - * - * index.search(query, onApiResponse); - * - * //- - * // Configure the query. - * //- - * var query = { - * query: 'person:stephen', - * pageSize: 10 - * }; - * - * index.search(query, onApiResponse); - * - * //- - * // To have pagination handled for you, set `autoPaginate`. Note the changed - * // callback parameters. - * //- - * var query = { - * query: 'person:stephen', - * autoPaginate: true - * }; - * - * index.search(query, function(err, indexes) { - * // Called after all indexes have been retrieved. - * }); + * index.search({ + * autoPaginate: false, + * query: query + * }, onApiResponse); * * //- * // Get the documents that match your query as a readable object stream. * //- - * index.search('person:stephen') + * index.search(query) * .on('error', console.error) * .on('data', function(document) { * // document is a Document object. diff --git a/lib/search/index.js b/lib/search/index.js index 56ea03107f2..75f109bceb4 100644 --- a/lib/search/index.js +++ b/lib/search/index.js @@ -97,7 +97,7 @@ function Search(options) { * * @param {object=} query - Query object. * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: false. + * automatically. Default: true. * @param {string} query.pageSize - The maximum number of indexes to return per * page. If not specified, 100 indexes are returned per page. * @param {string} query.pageToken - A previously-returned page token @@ -109,37 +109,32 @@ function Search(options) { * @param {function} callback - The callback function. * * @example + * search.getIndexes(function(err, indexes) { + * // indexes is an array of Index objects. + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- * function onApiResponse(err, indexes, nextQuery, apiResponse) { * if (err) { * console.error(err); * return; * } * + * // indexes is an array of Index objects. + * * if (nextQuery) { * search.getIndexes(nextQuery, onApiResponse); * } * } * - * search.getIndexes(onApiResponse); - * - * //- - * // Customize the request using a query object. - * //- * search.getIndexes({ - * pageSize: 10 + * autoPaginate: false * }, onApiResponse); * * //- - * // To have pagination handled for you, set `autoPaginate`. Note the changed - * // callback parameters. - * //- - * search.getIndexes({ - * autoPaginate: true - * }, function(err, indexes) { - * // Called after all indexes have been retrieved. - * }); - * - * //- * // Get the indexes as a readable object stream. * //- * search.getIndexes() diff --git a/lib/storage/bucket.js b/lib/storage/bucket.js index 823e20da651..d8e87249666 100644 --- a/lib/storage/bucket.js +++ b/lib/storage/bucket.js @@ -456,7 +456,7 @@ Bucket.prototype.file = function(name, options) { * * @param {object=} query - Query object. * @param {boolean} query.autoPaginate - Have pagination handled automatically. - * Default: false. + * Default: true. * @param {string} query.delimiter - Results will contain only objects whose * names, aside from the prefix, do not contain delimiter. Objects whose * names, aside from the prefix, contain delimiter will have their name @@ -473,47 +473,44 @@ Bucket.prototype.file = function(name, options) { * @param {function} callback - The callback function. * * @example - * bucket.getFiles(function(err, files, nextQuery, apiResponse) { - * if (nextQuery) { - * // nextQuery will be non-null if there are more results. - * bucket.getFiles(nextQuery, function(err, files, nextQ, apiResponse) {}); + * bucket.getFiles(function(err, files) { + * if (!err) { + * // files is an array of File objects. * } - * - * // The `metadata` property is populated for you with the metadata at the - * // time of fetching. - * files[0].metadata; - * - * // However, in cases where you are concerned the metadata could have - * // changed, use the `getMetadata` method. - * files[0].getMetadata(function(err, metadata) {}); * }); * * //- - * // Fetch using a query. - * //- - * bucket.getFiles({ - * maxResults: 5 - * }, function(err, files, nextQuery, apiResponse) {}); - * - * //- * // If your bucket has versioning enabled, you can get all of your files * // scoped to their generation. * //- * bucket.getFiles({ * versions: true - * }, function(err, files, nextQuery, apiResponse) { + * }, function(err, files) { * // Each file is scoped to its generation. * }); * * //- - * // To have pagination handled for you, set `autoPaginate`. Note the changed - * // callback parameters. + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. * //- + * var callback = function(err, files, nextQuery, apiResponse) { + * if (nextQuery) { + * // More results exist. + * bucket.getFiles(nextQuery, callback); + * } + * + * // The `metadata` property is populated for you with the metadata at the + * // time of fetching. + * files[0].metadata; + * + * // However, in cases where you are concerned the metadata could have + * // changed, use the `getMetadata` method. + * files[0].getMetadata(function(err, metadata) {}); + * }; + * * bucket.getFiles({ - * autoPaginate: true - * }, function(err, files) { - * // Called after all files have been retrieved. - * }); + * autoPaginate: false + * }, callback); * * //- * // Get the files from your bucket as a readable object stream. diff --git a/lib/storage/index.js b/lib/storage/index.js index 6ba1bdc1246..3c9dee7f287 100644 --- a/lib/storage/index.js +++ b/lib/storage/index.js @@ -250,7 +250,7 @@ Storage.prototype.createBucket = function(name, metadata, callback) { * * @param {object=} query - Query object. * @param {boolean} options.autoPaginate - Have pagination handled - * automatically. Default: false. + * automatically. Default: true. * @param {number} query.maxResults - Maximum number of items plus prefixes to * return. * @param {string} query.pageToken - A previously-returned page token @@ -258,10 +258,19 @@ Storage.prototype.createBucket = function(name, metadata, callback) { * @param {function} callback - The callback function. * * @example - * gcs.getBuckets(function(err, buckets, nextQuery) { + * gcs.getBuckets(function(err, buckets) { + * if (!err) { + * // buckets is an array of Bucket objects. + * } + * }); + * + * //- + * // To control how many API requests are made and page through the results + * // manually, set `autoPaginate` to `false`. + * //- + * var callback = function(err, buckets, nextQuery) { * if (nextQuery) { - * // nextQuery will be non-null if there are more results. - * var callback = function(err, buckets, nextQuery, apiResponse){}; + * // More results exist. * gcs.getBuckets(nextQuery, callback); * } * @@ -272,24 +281,11 @@ Storage.prototype.createBucket = function(name, metadata, callback) { * // However, in cases where you are concerned the metadata could have * // changed, use the `getMetadata` method. * buckets[0].getMetadata(function(err, metadata, apiResponse) {}); - * }); - * - * //- - * // Fetch using a query. - * //- - * gcs.getBuckets({ - * maxResults: 5 - * }, function(err, buckets, nextQuery, apiResponse) {}); + * }; * - * //- - * // To have pagination handled for you, set `autoPaginate`. Note the changed - * // callback parameters. - * //- * gcs.getBuckets({ - * autoPaginate: true - * }, function(err, buckets) { - * // Called after all buckets have been retrieved. - * }); + * autoPaginate: false + * }, callback); * * //- * // Get the buckets from your project as a readable object stream. diff --git a/system-test/bigquery.js b/system-test/bigquery.js index 3b70c172596..dbb7c001a81 100644 --- a/system-test/bigquery.js +++ b/system-test/bigquery.js @@ -139,7 +139,7 @@ describe('BigQuery', function() { }); it('should list datasets with autoPaginate', function(done) { - bigquery.getDatasets({ autoPaginate: true }, function(err, datasets) { + bigquery.getDatasets(function(err, datasets) { assert(datasets.length > 0); assert(datasets[0] instanceof Dataset); done(); @@ -166,6 +166,7 @@ describe('BigQuery', function() { assert(job instanceof Job); job.getQueryResults(function(err, rows) { + assert.ifError(err); assert.equal(rows.length, 100); assert.equal(typeof rows[0].url, 'string'); done(); @@ -207,25 +208,22 @@ describe('BigQuery', function() { }); }); - it('should allow querying in series', function(done) { - bigquery.query({ - query: query, - maxResults: 10 - }, function(err, rows, nextQuery) { + it('should query', function(done) { + bigquery.query(query, function(err, rows) { assert.ifError(err); - assert.equal(rows.length, 10); - assert.equal(typeof nextQuery.pageToken, 'string'); + assert.equal(rows.length, 100); done(); }); }); - it('should query with autoPaginate', function(done) { + it('should allow querying in series', function(done) { bigquery.query({ query: query, - autoPaginate: true - }, function(err, rows) { + maxResults: 10 + }, function(err, rows, nextQuery) { assert.ifError(err); - assert.equal(rows.length, 100); + assert.equal(rows.length, 10); + assert.equal(typeof nextQuery.pageToken, 'string'); done(); }); }); @@ -239,7 +237,7 @@ describe('BigQuery', function() { }); it('should list jobs with autoPaginate', function(done) { - bigquery.getJobs({ autoPaginate: true }, function(err, jobs) { + bigquery.getJobs(function(err, jobs) { assert.ifError(err); assert(jobs[0] instanceof Job); done(); @@ -299,7 +297,7 @@ describe('BigQuery', function() { }); it('should get the rows in a table with autoPaginate', function(done) { - table.getRows({ autoPaginate: true }, function(err, rows) { + table.getRows(function(err, rows) { assert.ifError(err); assert(Array.isArray(rows)); done(); diff --git a/system-test/datastore.js b/system-test/datastore.js index 33b6a5c3ea1..1fff93925ee 100644 --- a/system-test/datastore.js +++ b/system-test/datastore.js @@ -280,7 +280,8 @@ describe('datastore', function() { }); it('should limit queries', function(done) { - var q = ds.createQuery('Character').hasAncestor(ancestor).limit(5); + var q = ds.createQuery('Character').hasAncestor(ancestor).limit(5) + .autoPaginate(false); ds.runQuery(q, function(err, firstEntities, secondQuery) { assert.ifError(err); @@ -301,8 +302,7 @@ describe('datastore', function() { it('should run query with autoPaginate', function(done) { var q = ds.createQuery('Character') - .hasAncestor(ancestor) - .autoPaginate(); + .hasAncestor(ancestor); ds.runQuery(q, function(err, results) { assert.ifError(err); @@ -316,8 +316,7 @@ describe('datastore', function() { var q = ds.createQuery('Character') .hasAncestor(ancestor) - .limit(limit) - .autoPaginate(); + .limit(limit); ds.runQuery(q, function(err, results) { assert.ifError(err); @@ -429,7 +428,8 @@ describe('datastore', function() { .hasAncestor(ancestor) .offset(2) .limit(3) - .order('appearances'); + .order('appearances') + .autoPaginate(false); ds.runQuery(q, function(err, entities, secondQuery) { assert.ifError(err); @@ -454,7 +454,8 @@ describe('datastore', function() { .hasAncestor(ancestor) .offset(2) .limit(2) - .order('appearances'); + .order('appearances') + .autoPaginate(false); ds.runQuery(q, function(err, entities, nextQuery) { assert.ifError(err); diff --git a/system-test/pubsub.js b/system-test/pubsub.js index 51221bcd5cc..f772251acf6 100644 --- a/system-test/pubsub.js +++ b/system-test/pubsub.js @@ -95,14 +95,14 @@ describe('pubsub', function() { }); }); - it('should return a nextQuery if there are more results', function(done) { + it('should allow manual paging', function(done) { pubsub.getTopics({ pageSize: TOPIC_NAMES.length - 1 - }, function(err, topics, next) { + }, function(err, topics, nextQuery) { assert.ifError(err); assert(topics.length, TOPIC_NAMES.length - 1); - assert(next.pageSize, TOPIC_NAMES.length - 1); - assert(!!next.pageToken, true); + assert(nextQuery.pageSize, TOPIC_NAMES.length - 1); + assert(!!nextQuery.pageToken, true); done(); }); }); diff --git a/system-test/search.js b/system-test/search.js index 6b8035b8464..1083e09a914 100644 --- a/system-test/search.js +++ b/system-test/search.js @@ -34,53 +34,25 @@ function deleteDocument(document, callback) { } function deleteIndexContents(index, callback) { - function handleResp(err, documents, nextQuery) { + index.getDocuments(function(err, documents) { if (err) { callback(err); return; } - async.eachLimit(documents, MAX_PARALLEL, deleteDocument, function(err) { - if (err) { - callback(err); - return; - } - - if (nextQuery) { - index.getDocuments(nextQuery, handleResp); - return; - } - - callback(); - }); - } - - index.getDocuments(handleResp); + async.eachLimit(documents, MAX_PARALLEL, deleteDocument, callback); + }); } function deleteAllDocuments(callback) { - function handleResp(err, indexes, nextQuery) { + search.getIndexes(function(err, indexes) { if (err) { callback(err); return; } - async.eachLimit(indexes, MAX_PARALLEL, deleteIndexContents, function(err) { - if (err) { - callback(err); - return; - } - - if (nextQuery) { - search.getIndexes(nextQuery, handleResp); - return; - } - - callback(); - }); - } - - search.getIndexes(handleResp); + async.eachLimit(indexes, MAX_PARALLEL, deleteIndexContents, callback); + }); } function generateIndexName() { @@ -301,18 +273,6 @@ describe('Search', function() { }); }); - it('should search document with autoPaginate', function(done) { - index.search({ - query: query, - autoPaginate: true - }, function(err, results) { - assert.ifError(err); - assert.equal(results.length, 1); - assert.equal(results[0].id, DOCUMENT_NAME); - done(); - }); - }); - it('should search document in stream mode', function(done) { var results = []; diff --git a/system-test/storage.js b/system-test/storage.js index c46720bd05c..0e07d9d50a5 100644 --- a/system-test/storage.js +++ b/system-test/storage.js @@ -395,31 +395,14 @@ describe('storage', function() { }); it('should get buckets', function(done) { - storage.getBuckets(getBucketsHandler); - - var createdBuckets = []; - var retries = 0; - var MAX_RETRIES = 2; - - function getBucketsHandler(err, buckets, nextQuery) { - buckets.forEach(function(bucket) { - if (bucketsToCreate.indexOf(bucket.name) > -1) { - createdBuckets.push(bucket); - } + storage.getBuckets(function(err, buckets) { + var createdBuckets = buckets.filter(function(bucket) { + return bucketsToCreate.indexOf(bucket.name) > -1; }); - if (createdBuckets.length < bucketsToCreate.length && nextQuery) { - retries++; - - if (retries <= MAX_RETRIES) { - storage.getBuckets(nextQuery, getBucketsHandler); - return; - } - } - assert.equal(createdBuckets.length, bucketsToCreate.length); done(); - } + }); }); it('should get buckets with autoPaginate', function(done) { @@ -719,10 +702,9 @@ describe('storage', function() { }); it('should get files', function(done) { - bucket.getFiles(function(err, files, nextQuery) { + bucket.getFiles(function(err, files) { assert.ifError(err); assert.equal(files.length, filenames.length); - assert.equal(nextQuery, null); done(); }); }); @@ -751,7 +733,10 @@ describe('storage', function() { }); it('should paginate the list', function(done) { - var query = { maxResults: filenames.length - 1 }; + var query = { + maxResults: filenames.length - 1 + }; + bucket.getFiles(query, function(err, files, nextQuery) { assert.ifError(err); assert.equal(files.length, filenames.length - 1); diff --git a/test/common/stream-router.js b/test/common/stream-router.js index 003537f0745..deff15c8861 100644 --- a/test/common/stream-router.js +++ b/test/common/stream-router.js @@ -145,6 +145,15 @@ describe('streamRouter', function() { }); describe('parseArguments_', function() { + it('should set defaults', function() { + var parsedArguments = streamRouter.parseArguments_([]); + + assert.strictEqual(Object.keys(parsedArguments.query).length, 0); + assert.strictEqual(parsedArguments.callback, undefined); + assert.strictEqual(parsedArguments.maxResults, -1); + assert.strictEqual(parsedArguments.autoPaginate, true); + }); + it('should detect a callback if first argument is a function', function() { var args = [ util.noop ]; var parsedArguments = streamRouter.parseArguments_(args); @@ -159,6 +168,13 @@ describe('streamRouter', function() { assert.strictEqual(parsedArguments.query, args[0]); }); + it('should not make an undefined value the query', function() { + var args = [ undefined, util.noop ]; + var parsedArguments = streamRouter.parseArguments_(args); + + assert.deepEqual(parsedArguments.query, {}); + }); + it('should detect a callback if last argument is a function', function() { var args = [ 'string', util.noop ]; var parsedArguments = streamRouter.parseArguments_(args); @@ -172,91 +188,65 @@ describe('streamRouter', function() { assert.strictEqual(parsedArguments.callback, undefined); }); - }); - describe('router_', function() { - beforeEach(function() { - streamRouterOverrides.runAsStream_ = util.noop; - }); + it('should set maxResults from query.maxResults', function() { + var args = [ { maxResults: 10 } ]; + var parsedArguments = streamRouter.parseArguments_(args); - describe('callback mode', function() { - describe('autoPaginate', function() { - it('should recognize autoPaginate', function(done) { - var parsedArguments = { - query: { - autoPaginate: true - }, - callback: util.noop - }; + assert.strictEqual(parsedArguments.maxResults, args[0].maxResults); + }); - streamRouterOverrides.runAsStream_ = function() { - done(); - return through(); - }; + it('should set maxResults from query.limitVal', function() { + var args = [ { limitVal: 10 } ]; + var parsedArguments = streamRouter.parseArguments_(args); - streamRouter.router_(parsedArguments, util.noop); - }); + assert.strictEqual(parsedArguments.maxResults, args[0].limitVal); + }); - it('should recognize autoPaginateVal', function(done) { - var parsedArguments = { - query: { - autoPaginateVal: true - }, - callback: util.noop - }; + it('should set maxResults from query.pageSize', function() { + var args = [ { pageSize: 10 } ]; + var parsedArguments = streamRouter.parseArguments_(args); - streamRouterOverrides.runAsStream_ = function() { - done(); - return through(); - }; + assert.strictEqual(parsedArguments.maxResults, args[0].pageSize); + }); - streamRouter.router_(parsedArguments, util.noop); - }); + it('should set autoPaginate: false if there is a maxResults', function() { + var args = [ { maxResults: 10 }, util.noop ]; + var parsedArguments = streamRouter.parseArguments_(args); - it('should delete the autoPaginate property', function(done) { - var parsedArguments = { - query: { - autoPaginate: true - }, - callback: util.noop - }; + assert.strictEqual(parsedArguments.autoPaginate, false); + }); - streamRouterOverrides.runAsStream_ = function(query) { - assert.strictEqual(query.autoPaginate, undefined); - done(); - return through(); - }; + it('should set autoPaginate: false query.autoPaginate', function() { + var args = [ { autoPaginate: false }, util.noop ]; + var parsedArguments = streamRouter.parseArguments_(args); - streamRouter.router_(parsedArguments, util.noop); - }); + assert.strictEqual(parsedArguments.autoPaginate, false); + }); - it('should delete the autoPaginateVal property', function(done) { - var parsedArguments = { - query: { - autoPaginateVal: true - }, - callback: util.noop - }; + it('should set autoPaginate: false with query.autoPaginateVal', function() { + var args = [ { autoPaginateVal: false }, util.noop ]; + var parsedArguments = streamRouter.parseArguments_(args); - streamRouterOverrides.runAsStream_ = function(query) { - assert.strictEqual(query.autoPaginateVal, undefined); - done(); - return through(); - }; + assert.strictEqual(parsedArguments.autoPaginate, false); + }); + }); - streamRouter.router_(parsedArguments, util.noop); - }); + describe('router_', function() { + beforeEach(function() { + streamRouterOverrides.runAsStream_ = util.noop; + }); - it('should runAsStream', function(done) { + describe('callback mode', function() { + describe('autoPaginate', function() { + it('should call runAsStream_ when autoPaginate:true', function(done) { var parsedArguments = { - query: { - autoPaginate: true - }, + autoPaginate: true, callback: util.noop }; - streamRouterOverrides.runAsStream_ = function(query, originalMethod) { - assert.deepEqual(query, {}); + streamRouterOverrides.runAsStream_ = function(args, originalMethod) { + assert.strictEqual(args, parsedArguments); originalMethod(); return through(); }; @@ -268,9 +258,7 @@ describe('streamRouter', function() { var error = new Error('Error.'); var parsedArguments = { - query: { - autoPaginate: true - }, + autoPaginate: true, callback: function(err) { assert.strictEqual(err, error); done(); @@ -292,9 +280,7 @@ describe('streamRouter', function() { var results = ['a', 'b', 'c']; var parsedArguments = { - query: { - autoPaginate: true - }, + autoPaginate: true, callback: function(err, results_) { assert.deepEqual(results_.toString().split(''), results); done(); @@ -320,14 +306,19 @@ describe('streamRouter', function() { }); describe('manual pagination', function() { - it('should call original method', function(done) { + it('should recoginze autoPaginate: false', function(done) { var parsedArguments = { - query: { a: 'b', c: 'd' }, + autoPaginate: false, + query: { + a: 'b', + c: 'd' + }, callback: done }; streamRouter.router_(parsedArguments, function(query, callback) { assert.deepEqual(query, parsedArguments.query); + callback(); }); }); @@ -340,8 +331,8 @@ describe('streamRouter', function() { query: { a: 'b', c: 'd' } }; - streamRouterOverrides.runAsStream_ = function(query, originalMethod) { - assert.deepEqual(query, parsedArguments.query); + streamRouterOverrides.runAsStream_ = function(args, originalMethod) { + assert.deepEqual(args, parsedArguments); originalMethod(); }; @@ -367,15 +358,19 @@ describe('streamRouter', function() { describe('runAsStream_', function() { describe('stream mode', function() { - var QUERY = { a: 'b', c: 'd' }; + var PARSED_ARGUMENTS = { + query: { + a: 'b', c: 'd' + } + }; it('should call original method when stream opens', function(done) { function originalMethod(query) { - assert.strictEqual(query, QUERY); + assert.strictEqual(query, PARSED_ARGUMENTS.query); done(); } - var rs = streamRouter.runAsStream_(QUERY, originalMethod); + var rs = streamRouter.runAsStream_(PARSED_ARGUMENTS, originalMethod); rs.on('data', util.noop); // Trigger the underlying `_read` event. }); @@ -388,7 +383,7 @@ describe('streamRouter', function() { }); } - var rs = streamRouter.runAsStream_(QUERY, originalMethod); + var rs = streamRouter.runAsStream_(PARSED_ARGUMENTS, originalMethod); rs.on('data', util.noop); // Trigger the underlying `_read` event. rs.on('error', function(err) { assert.deepEqual(err, error); @@ -406,7 +401,7 @@ describe('streamRouter', function() { }); } - var rs = streamRouter.runAsStream_(QUERY, originalMethod); + var rs = streamRouter.runAsStream_(PARSED_ARGUMENTS, originalMethod); rs.on('data', function(result) { resultsReceived.push(result); }); @@ -425,7 +420,7 @@ describe('streamRouter', function() { }); } - it('should respect query.maxResults', function(done) { + it('should respect maxResults', function(done) { var numResultsReceived = 0; streamRouter.runAsStream_({ maxResults: limit }, originalMethod) @@ -435,17 +430,6 @@ describe('streamRouter', function() { done(); }); }); - - it('should respect query.limitVal', function(done) { - var numResultsReceived = 0; - - streamRouter.runAsStream_({ limitVal: limit }, originalMethod) - .on('data', function() { numResultsReceived++; }) - .on('end', function() { - assert.strictEqual(numResultsReceived, limit); - done(); - }); - }); }); it('should get more results if nextQuery exists', function(done) { @@ -465,7 +449,7 @@ describe('streamRouter', function() { }); } - var rs = streamRouter.runAsStream_(QUERY, originalMethod); + var rs = streamRouter.runAsStream_(PARSED_ARGUMENTS, originalMethod); rs.on('data', util.noop); // Trigger the underlying `_read` event. }); @@ -478,7 +462,7 @@ describe('streamRouter', function() { }); } - var rs = streamRouter.runAsStream_(QUERY, originalMethod); + var rs = streamRouter.runAsStream_(PARSED_ARGUMENTS, originalMethod); rs.on('data', function(result) { if (result === 'b') { // Pre-maturely end the stream. @@ -505,7 +489,7 @@ describe('streamRouter', function() { }); } - var rs = streamRouter.runAsStream_(QUERY, originalMethod); + var rs = streamRouter.runAsStream_(PARSED_ARGUMENTS, originalMethod); rs.on('data', function(result) { if (result === 'b') { // Pre-maturely end the stream. diff --git a/test/common/util.js b/test/common/util.js index b447c64a0cc..cf039ba67e6 100644 --- a/test/common/util.js +++ b/test/common/util.js @@ -851,30 +851,25 @@ describe('common/util', function() { }); describe('makeRequest', function() { - var PKG = require('../../package.json'); - var USER_AGENT = 'gcloud-node/' + PKG.version; - var reqOpts = { a: 'b', c: 'd' }; - var expectedReqOpts = extend(true, {}, reqOpts, { - headers: { - 'User-Agent': USER_AGENT - } - }); + it('should decorate the request', function(done) { + var reqOpts = { a: 'b', c: 'd' }; - it('should make a request', function(done) { - request_Override = function() { + request_Override = util.noop; + + utilOverrides.decorateRequest = function(reqOpts_) { + assert.strictEqual(reqOpts_, reqOpts); done(); }; - util.makeRequest({}, assert.ifError, {}); + util.makeRequest(reqOpts, {}, assert.ifError); }); - it('should add the user agent', function(done) { - request_Override = function(rOpts) { - assert.deepEqual(rOpts, expectedReqOpts); + it('should make a request', function(done) { + request_Override = function() { done(); }; - util.makeRequest(reqOpts, assert.ifError, {}); + util.makeRequest({}, assert.ifError, {}); }); it('should let handleResp handle the response', function(done) { @@ -1006,4 +1001,62 @@ describe('common/util', function() { }); }); }); + + describe('decorateRequest', function() { + it('should add the user agent', function() { + var PKG = require('../../package.json'); + var USER_AGENT = 'gcloud-node/' + PKG.version; + + var reqOpts = { a: 'b', c: 'd' }; + + var expectedReqOpts = extend({}, reqOpts, { + headers: { + 'User-Agent': USER_AGENT + } + }); + + var decoratedReqOpts = util.decorateRequest(reqOpts); + assert.deepEqual(decoratedReqOpts, expectedReqOpts); + }); + + it('should delete qs.autoPaginate', function() { + var decoratedReqOpts = util.decorateRequest({ + qs: { + autoPaginate: true + } + }); + + assert.strictEqual(decoratedReqOpts.autoPaginate, undefined); + }); + + it('should delete qs.autoPaginateVal', function() { + var decoratedReqOpts = util.decorateRequest({ + qs: { + autoPaginateVal: true + } + }); + + assert.strictEqual(decoratedReqOpts.autoPaginate, undefined); + }); + + it('should delete json.autoPaginate', function() { + var decoratedReqOpts = util.decorateRequest({ + json: { + autoPaginate: true + } + }); + + assert.strictEqual(decoratedReqOpts.autoPaginate, undefined); + }); + + it('should delete json.autoPaginateVal', function() { + var decoratedReqOpts = util.decorateRequest({ + json: { + autoPaginateVal: true + } + }); + + assert.strictEqual(decoratedReqOpts.autoPaginate, undefined); + }); + }); }); diff --git a/test/datastore/query.js b/test/datastore/query.js index a4d8ddd51d9..654873e04f8 100644 --- a/test/datastore/query.js +++ b/test/datastore/query.js @@ -45,15 +45,21 @@ describe('Query', function() { it('should default autoPaginate to false', function() { var query = new Query(['kind1']); - assert.strictEqual(query.autoPaginateVal, false); + assert.strictEqual(query.autoPaginateVal, true); }); - it('should support setting autoPagination', function() { + it('should default autoPaginate() to true', function() { var query = new Query(['kind1']) .autoPaginate(); assert.strictEqual(query.autoPaginateVal, true); }); + it('should support setting autoPaginate to false', function() { + var query = new Query(['kind1']) + .autoPaginate(false); + assert.strictEqual(query.autoPaginateVal, false); + }); + it('should support field selection by field name', function() { var query = new Query(['kind1']) .select(['name', 'title']);