Skip to content

Commit

Permalink
Merge pull request #291 from stephenplusplus/spp--datastore-runQuery-…
Browse files Browse the repository at this point in the history
…stream

datastore: runQuery as a stream. fixes #289
  • Loading branch information
ryanseys committed Nov 7, 2014
2 parents b9767ad + c3cb649 commit 6022e19
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 29 deletions.
82 changes: 67 additions & 15 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
'use strict';

var https = require('https');
var streamEvents = require('stream-events');
var through = require('through2');

/**
* @type module:datastore/entity
Expand Down Expand Up @@ -295,11 +297,20 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
/**
* Datastore allows you to query entities by kind, filter them by property
* filters, and sort them by a property name. Projection and pagination are also
* supported. If more results are available, a query to retrieve the next page
* is provided to the callback function.
* supported.
*
* If you provide a callback, the query is run, and the results are returned as
* the second argument to your callback. A third argument will also exist, which
* is the `endCursor` of the previously-run query. You can use this to extend
* the query you just ran to see if more results exist.
*
* You may also omit the callback to this function to trigger streaming mode.
*
* See below for examples of both approaches.
*
* @param {module:datastore/query} q - Query object.
* @param {function} callback - The callback function.
* @param {function=} callback - The callback function. If omitted, a readable
* stream instance is returned.
*
* @example
* //-
Expand All @@ -313,9 +324,18 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
* var nextQuery = queryObject.start(endCursor);
* transaction.runQuery(nextQuery, function(err, entities, endCursor) {});
* });
*
* //-
* // If you omit the callback, runQuery will automatically call subsequent
* // queries until no results remain. Entity objects will be pushed as they are
* // found.
* //-
* transaction.runQuery(queryObject)
* .on('data', function (entity) {});
*/
DatastoreRequest.prototype.runQuery = function(q, callback) {
callback = callback || util.noop;
var that = this;
var stream;

var req = {
read_options: {},
Expand All @@ -328,19 +348,51 @@ DatastoreRequest.prototype.runQuery = function(q, callback) {
};
}

this.makeReq_('runQuery', req, function(err, resp) {
if (err || !resp.batch || !resp.batch.entity_result) {
callback(err);
return;
}
if (!util.is(callback, 'function')) {
stream = streamEvents(through.obj());
stream.once('reading', runQuery);
return stream;
} else {
callback = callback || util.noop;
runQuery();
}

var cursor = '';
if (resp.batch.end_cursor) {
cursor = resp.batch.end_cursor.toBase64();
}
function runQuery() {
that.makeReq_('runQuery', req, function(err, resp) {
if (err) {
if (stream) {
stream.emit('error', err);
stream.end();
} else {
callback(err);
}
return;
}

callback(null, entity.formatArray(resp.batch.entity_result), cursor);
});
var entities = entity.formatArray(resp.batch.entity_result);

var cursor = '';
if (resp.batch.end_cursor) {
cursor = resp.batch.end_cursor.toBase64();
}

if (stream) {
if (cursor && entities.length > 0) {
entities.forEach(function (entity) {
stream.push(entity);
});

req.query = entity.queryToQueryProto(q.start(cursor).offset(0));

runQuery();
} else {
stream.end();
}
} else {
callback(null, entities, cursor);
}
});
}
};

/**
Expand Down
15 changes: 15 additions & 0 deletions regression/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,21 @@ describe('datastore', function() {
});
});

it('should run a query as a stream', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.limit(5);

var resultsReturned = 0;

ds.runQuery(q)
.on('error', done)
.on('data', function() { resultsReturned++; })
.on('end', function() {
assert.equal(resultsReturned, characters.length);
done();
});
});

it('should filter queries with simple indexes', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.filter('appearances >=', 20);
Expand Down
76 changes: 62 additions & 14 deletions test/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var https = require('https');
var mockRespGet = require('../testdata/response_get.json');
var pb = require('../../lib/datastore/pb.js');
var Query = require('../../lib/datastore/query.js');
var Stream = require('stream');
var util = require('../../lib/common/util.js');

var httpsRequestOverride = util.noop;
Expand Down Expand Up @@ -278,8 +279,7 @@ describe('Request', function() {
entity_result: mockRespGet.found,
end_cursor: new ByteBuffer().writeIString('cursor').flip()
}
},
withoutResults: mockRespGet
}
};

beforeEach(function() {
Expand All @@ -298,18 +298,6 @@ describe('Request', function() {
assert.equal(err, error);
});
});

it('should handle missing results error', function() {
request.makeReq_ = function(method, req, callback) {
assert.equal(method, 'runQuery');
callback(null, mockResponse.withoutResults);
};

request.runQuery(query, function(err, entities) {
assert.strictEqual(err, null);
assert.strictEqual(entities, undefined);
});
});
});

it('should execute callback with results', function() {
Expand Down Expand Up @@ -354,6 +342,66 @@ describe('Request', function() {
done();
});
});

describe('streams', function() {
it('should be a stream if a callback is omitted', function() {
assert(request.runQuery(query) instanceof Stream);
});

it('should run the query after being read from', function(done) {
request.makeReq_ = function() {
done();
};

request.runQuery(query).emit('reading');
});

it('should continuosly run until there are no results', function(done) {
var run = 0;
var timesToRun = 2;

request.makeReq_ = function(method, req, callback) {
run++;

if (run < timesToRun) {
callback(null, mockResponse.withResultsAndEndCursor);
} else {
var lastEndCursor =
mockResponse.withResultsAndEndCursor.batch.end_cursor.toBase64();
lastEndCursor = new Buffer(lastEndCursor, 'base64').toString();

assert.equal(String(req.query.start_cursor), lastEndCursor);
assert.strictEqual(req.query.offset, undefined);

callback(null, mockResponse.withResults);
}
};

var resultsReturned = 0;

request.runQuery(query)
.on('data', function() { resultsReturned++; })
.on('end', function() {
assert.equal(resultsReturned, mockRespGet.found.length);
done();
});
});

it('should emit an error', function(done) {
var error = new Error('Error.');

request.makeReq_ = function(method, req, callback) {
callback(error);
};

request.runQuery(query)
.on('error', function(err) {
assert.equal(err, error);
done();
})
.emit('reading');
});
});
});

describe('allocateIds', function() {
Expand Down

0 comments on commit 6022e19

Please sign in to comment.