diff --git a/lib/datastore/request.js b/lib/datastore/request.js index 82b907ffadc..7907189b035 100644 --- a/lib/datastore/request.js +++ b/lib/datastore/request.js @@ -21,6 +21,8 @@ 'use strict'; var https = require('https'); +var streamEvents = require('stream-events'); +var through = require('through2'); /** * @type module:datastore/entity @@ -295,11 +297,20 @@ DatastoreRequest.prototype.delete = function(keys, callback) { /** * Datastore allows you to query entities by kind, filter them by property * filters, and sort them by a property name. Projection and pagination are also - * supported. If more results are available, a query to retrieve the next page - * is provided to the callback function. + * supported. + * + * If you provide a callback, the query is run, and the results are returned as + * the second argument to your callback. A third argument will also exist, which + * is the `endCursor` of the previously-run query. You can use this to extend + * the query you just ran to see if more results exist. + * + * You may also omit the callback to this function to trigger streaming mode. + * + * See below for examples of both approaches. * * @param {module:datastore/query} q - Query object. - * @param {function} callback - The callback function. + * @param {function=} callback - The callback function. If omitted, a readable + * stream instance is returned. * * @example * //- @@ -313,9 +324,18 @@ DatastoreRequest.prototype.delete = function(keys, callback) { * var nextQuery = queryObject.start(endCursor); * transaction.runQuery(nextQuery, function(err, entities, endCursor) {}); * }); + * + * //- + * // If you omit the callback, runQuery will automatically call subsequent + * // queries until no results remain. Entity objects will be pushed as they are + * // found. + * //- + * transaction.runQuery(queryObject) + * .on('data', function (entity) {}); */ DatastoreRequest.prototype.runQuery = function(q, callback) { - callback = callback || util.noop; + var that = this; + var stream; var req = { read_options: {}, @@ -328,19 +348,51 @@ DatastoreRequest.prototype.runQuery = function(q, callback) { }; } - this.makeReq_('runQuery', req, function(err, resp) { - if (err || !resp.batch || !resp.batch.entity_result) { - callback(err); - return; - } + if (!util.is(callback, 'function')) { + stream = streamEvents(through.obj()); + stream.once('reading', runQuery); + return stream; + } else { + callback = callback || util.noop; + runQuery(); + } - var cursor = ''; - if (resp.batch.end_cursor) { - cursor = resp.batch.end_cursor.toBase64(); - } + function runQuery() { + that.makeReq_('runQuery', req, function(err, resp) { + if (err) { + if (stream) { + stream.emit('error', err); + stream.end(); + } else { + callback(err); + } + return; + } - callback(null, entity.formatArray(resp.batch.entity_result), cursor); - }); + var entities = entity.formatArray(resp.batch.entity_result); + + var cursor = ''; + if (resp.batch.end_cursor) { + cursor = resp.batch.end_cursor.toBase64(); + } + + if (stream) { + if (cursor && entities.length > 0) { + entities.forEach(function (entity) { + stream.push(entity); + }); + + req.query = entity.queryToQueryProto(q.start(cursor).offset(0)); + + runQuery(); + } else { + stream.end(); + } + } else { + callback(null, entities, cursor); + } + }); + } }; /** diff --git a/regression/datastore.js b/regression/datastore.js index 056c3c89420..149c91801f1 100644 --- a/regression/datastore.js +++ b/regression/datastore.js @@ -248,6 +248,21 @@ describe('datastore', function() { }); }); + it('should run a query as a stream', function(done) { + var q = ds.createQuery('Character').hasAncestor(ancestor) + .limit(5); + + var resultsReturned = 0; + + ds.runQuery(q) + .on('error', done) + .on('data', function() { resultsReturned++; }) + .on('end', function() { + assert.equal(resultsReturned, characters.length); + done(); + }); + }); + it('should filter queries with simple indexes', function(done) { var q = ds.createQuery('Character').hasAncestor(ancestor) .filter('appearances >=', 20); diff --git a/test/datastore/request.js b/test/datastore/request.js index 66ee13ff460..85a9a0e4035 100644 --- a/test/datastore/request.js +++ b/test/datastore/request.js @@ -27,6 +27,7 @@ var https = require('https'); var mockRespGet = require('../testdata/response_get.json'); var pb = require('../../lib/datastore/pb.js'); var Query = require('../../lib/datastore/query.js'); +var Stream = require('stream'); var util = require('../../lib/common/util.js'); var httpsRequestOverride = util.noop; @@ -278,8 +279,7 @@ describe('Request', function() { entity_result: mockRespGet.found, end_cursor: new ByteBuffer().writeIString('cursor').flip() } - }, - withoutResults: mockRespGet + } }; beforeEach(function() { @@ -298,18 +298,6 @@ describe('Request', function() { assert.equal(err, error); }); }); - - it('should handle missing results error', function() { - request.makeReq_ = function(method, req, callback) { - assert.equal(method, 'runQuery'); - callback(null, mockResponse.withoutResults); - }; - - request.runQuery(query, function(err, entities) { - assert.strictEqual(err, null); - assert.strictEqual(entities, undefined); - }); - }); }); it('should execute callback with results', function() { @@ -354,6 +342,66 @@ describe('Request', function() { done(); }); }); + + describe('streams', function() { + it('should be a stream if a callback is omitted', function() { + assert(request.runQuery(query) instanceof Stream); + }); + + it('should run the query after being read from', function(done) { + request.makeReq_ = function() { + done(); + }; + + request.runQuery(query).emit('reading'); + }); + + it('should continuosly run until there are no results', function(done) { + var run = 0; + var timesToRun = 2; + + request.makeReq_ = function(method, req, callback) { + run++; + + if (run < timesToRun) { + callback(null, mockResponse.withResultsAndEndCursor); + } else { + var lastEndCursor = + mockResponse.withResultsAndEndCursor.batch.end_cursor.toBase64(); + lastEndCursor = new Buffer(lastEndCursor, 'base64').toString(); + + assert.equal(String(req.query.start_cursor), lastEndCursor); + assert.strictEqual(req.query.offset, undefined); + + callback(null, mockResponse.withResults); + } + }; + + var resultsReturned = 0; + + request.runQuery(query) + .on('data', function() { resultsReturned++; }) + .on('end', function() { + assert.equal(resultsReturned, mockRespGet.found.length); + done(); + }); + }); + + it('should emit an error', function(done) { + var error = new Error('Error.'); + + request.makeReq_ = function(method, req, callback) { + callback(error); + }; + + request.runQuery(query) + .on('error', function(err) { + assert.equal(err, error); + done(); + }) + .emit('reading'); + }); + }); }); describe('allocateIds', function() {