Skip to content

Commit

Permalink
Merge pull request #692 from stephenplusplus/spp--implement-streamrouter
Browse files Browse the repository at this point in the history
streamrouter: implement across library
  • Loading branch information
stephenplusplus committed Jul 15, 2015
2 parents 7dc8310 + 3c379df commit 82bd654
Show file tree
Hide file tree
Showing 28 changed files with 1,558 additions and 764 deletions.
265 changes: 151 additions & 114 deletions lib/bigquery/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
'use strict';

var extend = require('extend');
var streamEvents = require('stream-events');
var through = require('through2');

/**
* @type {module:bigquery/dataset}
Expand All @@ -36,6 +34,12 @@ var Dataset = require('./dataset.js');
*/
var Job = require('./job.js');

/**
* @type {module:common/streamrouter}
* @private
*/
var streamRouter = require('../common/stream-router.js');

/**
* @type {module:bigquery/table}
* @private
Expand Down Expand Up @@ -158,15 +162,55 @@ BigQuery.prototype.dataset = function(id) {
*
* @param {object=} query - Configuration object.
* @param {boolean} query.all - List all datasets, including hidden ones.
* @param {boolean} query.autoPaginate - Have pagination handled automatically.
* Default: true.
* @param {number} query.maxResults - Maximum number of results to return.
* @param {string} query.pageToken - Token returned from a previous call, to
* request the next page of results.
* @param {function} callback - The callback function.
*
* @example
* bigquery.getDatasets(function(err, datasets, nextQuery, apiResponse) {
* // If `nextQuery` is non-null, there are more results to fetch.
* bigquery.getDatasets(function(err, datasets) {
* if (!err) {
* // datasets is an array of Dataset objects.
* }
* });
*
* //-
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* var callback = function(err, datasets, nextQuery, apiResponse) {
* if (nextQuery) {
* // More results exist.
* bigquery.getDatasets(nextQuery, callback);
* }
* };
*
* bigquery.getDatasets({
* autoPaginate: false
* }, callback);
*
* //-
* // Get the datasets from your project as a readable object stream.
* //-
* bigquery.getDatasets()
* .on('error', console.error)
* .on('data', function(dataset) {
* // dataset is a Dataset object.
* })
* .on('end', function() {
* // All datasets retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* bigquery.getDatasets()
* .on('data', function(dataset) {
* this.end();
* });
*/
BigQuery.prototype.getDatasets = function(query, callback) {
var that = this;
Expand Down Expand Up @@ -208,6 +252,8 @@ BigQuery.prototype.getDatasets = function(query, callback) {
* @param {object=} options - Configuration object.
* @param {boolean=} options.allUsers - Display jobs owned by all users in the
* project.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number=} options.maxResults - Maximum number of results to return.
* @param {string=} options.pageToken - Token returned from a previous call, to
* request the next page of results.
Expand All @@ -219,9 +265,47 @@ BigQuery.prototype.getDatasets = function(query, callback) {
* @param {function} callback - The callback function.
*
* @example
* bigquery.getJobs(function(err, jobs, nextQuery, apiResponse) {
* // If `nextQuery` is non-null, there are more results to fetch.
* bigquery.getJobs(function(err, jobs) {
* if (!err) {
* // jobs is an array of Job objects.
* }
* });
*
* //-
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* var callback = function(err, jobs, nextQuery, apiRespose) {
* if (nextQuery) {
* // More results exist.
* bigquery.getJobs(nextQuery, callback);
* }
* };
*
* bigquery.getJobs({
* autoPaginate: false
* }, callback);
*
* //-
* // Get the jobs from your project as a readable object stream.
* //-
* bigquery.getJobs()
* .on('error', console.error)
* .on('data', function(job) {
* // job is a Job object.
* })
* .on('end', function() {
* // All jobs retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* bigquery.getJobs()
* .on('data', function(job) {
* this.end();
* });
*/
BigQuery.prototype.getJobs = function(options, callback) {
var that = this;
Expand Down Expand Up @@ -270,31 +354,6 @@ BigQuery.prototype.job = function(id) {
return new Job(this, id);
};

/*! Developer Documentation
*
* The `query` method is dual-purpose, like the use cases for a query.
* Sometimes, a user will want to fetch results from their table in a serial
* manner (get results -> more results exist? -> get more results, repeat.) --
* other times, a user may want to wave their hands at making repeated calls to
* get all of the rows, instead using a stream.
*
* A couple different libraries are used to cover the stream case:
*
* var stream = streamEvents(through2.obj());
*
* - streamEvents - https://github.com/stephenplusplus/stream-events
* This library enables us to wait until our stream is being asked for
* data, before making any API calls. It is possible a user will get a
* stream, then not end up running it - or, it will be run later, at a
* time when the token returned from the API call could have expired.
* Using this library ensures we wait until the last possible chance to
* get that token.
*
* - through2 - https://github.com/rvagg/through2
* This is a popular library for how simple it makes dealing with the
* complicated Node.js Streams API. We're creating an object stream, as
* the data we are receiving from the API are rows of JSON data.
*/
/**
* Run a query scoped to your project.
*
Expand All @@ -310,48 +369,56 @@ BigQuery.prototype.job = function(id) {
* queries for you, pushing each row to the stream.
*
* @param {string|object} options - A string SQL query or configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number} options.maxResults - Maximum number of results to read.
* @param {string} options.query - A query string, following the BigQuery query
* syntax, of the query to execute.
* @param {number} options.timeoutMs - How long to wait for the query to
* complete, in milliseconds, before returning. Default is to return
* immediately. If the timeout passes before the job completes, the request
* will fail with a `TIMEOUT` error.
* @param {function=} callback - The callback function. If you intend to
* continuously run this query until all results are in as part of a stream,
* do not pass a callback.
* @param {function=} callback - The callback function.
*
* @example
* var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100';
*
* bigquery.query(query, function(err, rows) {
* if (!err) {
* // Handle results here.
* }
* });
*
* //-
* // You can run a query against your data in a serial manner.
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* bigquery.query(query, function(err, rows, nextQuery, apiResponse) {
* // Handle results here.
* var callback = function(err, rows, nextQuery, apiResponse) {
* if (nextQuery) {
* bigquery.query(nextQuery, function(err, rows, nextQuery, apiResponse) {
* // Handle more results here.
* });
* bigquery.query(nextQuery, callback);
* }
* });
* };
*
* bigquery.query({
* query: query,
* autoPaginate: false
* }, callback);
*
* //-
* // You can also use the `query` method as a readable object stream by
* // omitting the callback.
* //-
* var through2 = require('through2');
*
* bigquery.query(query)
* .pipe(through2.obj(function(row, enc, next) {
* this.push(row.url += '?trackingCode=AZ19b\n');
* next();
* }))
* .pipe(process.stdout);
* .on('error', console.error)
* .on('data', function(row) {
* // row is a result from your query.
* })
* .on('end', function() {
* // All rows retrieved.
* });
*/
BigQuery.prototype.query = function(options, callback) {
var that = this;
var stream;

if (util.is(options, 'string')) {
options = {
Expand All @@ -366,79 +433,42 @@ BigQuery.prototype.query = function(options, callback) {
var requestQuery = extend({}, options);
delete requestQuery.job;

if (!util.is(callback, 'function')) {
stream = streamEvents(through.obj());
stream.once('reading', runQuery);
return stream;
if (job) {
// Get results of the query.
var path = '/queries/' + job.id;
that.makeReq_('GET', path, requestQuery, null, responseHandler);
} else {
callback = callback || util.noop;
runQuery();
// Create a job.
that.makeReq_('POST', '/queries', null, options, responseHandler);
}

function runQuery() {
if (job) {
// Get results of the query.
var path = '/queries/' + job.id;
that.makeReq_('GET', path, requestQuery, null, responseHandler);
} else {
// Create a job.
that.makeReq_('POST', '/queries', null, options, responseHandler);
function responseHandler(err, resp) {
if (err) {
callback(err, null, null, resp);
return;
}

function responseHandler(err, resp) {
if (err) {
onComplete(err, null, null, resp);
return;
}

var rows = [];
if (resp.schema && resp.rows) {
rows = Table.mergeSchemaWithRows_(resp.schema, resp.rows);
}

var nextQuery = null;
if (resp.jobComplete === false) {
// Query is still running.
nextQuery = extend({}, options);
} else if (resp.pageToken) {
// More results exist.
nextQuery = extend({}, options, {
pageToken: resp.pageToken
});
}
if (nextQuery && !nextQuery.job && resp.jobReference.jobId) {
// Create a prepared Job to continue the query.
nextQuery.job = that.job(resp.jobReference.jobId);
}

onComplete(null, rows, nextQuery, resp);
var rows = [];
if (resp.schema && resp.rows) {
rows = Table.mergeSchemaWithRows_(resp.schema, resp.rows);
}

function onComplete(err, rows, nextQuery, resp) {
if (err) {
if (stream) {
stream.emit('error', err);
stream.end();
} else {
callback(err, null, null, resp);
}
return;
}

if (stream) {
rows.forEach(function(row) {
stream.push(row);
});

if (nextQuery) {
that.query(nextQuery, onComplete);
} else {
stream.end();
}
} else {
callback(null, rows, nextQuery, resp);
}
var nextQuery = null;
if (resp.jobComplete === false) {
// Query is still running.
nextQuery = extend({}, options);
} else if (resp.pageToken) {
// More results exist.
nextQuery = extend({}, options, {
pageToken: resp.pageToken
});
}
if (nextQuery && !nextQuery.job && resp.jobReference.jobId) {
// Create a prepared Job to continue the query.
nextQuery.job = that.job(resp.jobReference.jobId);
}

callback(null, rows, nextQuery, resp);
}
};

Expand Down Expand Up @@ -564,4 +594,11 @@ BigQuery.prototype.makeReq_ = function(method, path, query, body, callback) {
this.makeAuthorizedRequest_(reqOpts, callback);
};

/*! Developer Documentation
*
* These methods can be used with either a callback or as a readable object
* stream. `streamRouter` is used to add this dual behavior.
*/
streamRouter.extend(BigQuery, ['getDatasets', 'getJobs', 'query']);

module.exports = BigQuery;
Loading

0 comments on commit 82bd654

Please sign in to comment.