From 06cacc9a759eb25803d8b37171ecc5221a69f95c Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Sun, 2 Aug 2015 12:39:02 -0400 Subject: [PATCH] core: use split-array-stream --- lib/common/stream-router.js | 41 +++++++++---------- lib/datastore/request.js | 78 ++++++++++++++----------------------- package.json | 2 +- test/datastore/request.js | 49 +++++++++-------------- 4 files changed, 68 insertions(+), 102 deletions(-) diff --git a/lib/common/stream-router.js b/lib/common/stream-router.js index 23be9d736b4..d5d3f2e6b11 100644 --- a/lib/common/stream-router.js +++ b/lib/common/stream-router.js @@ -21,7 +21,7 @@ 'use strict'; var concat = require('concat-stream'); -var isStreamEnded = require('is-stream-ended'); +var split = require('split-array-stream'); var streamEvents = require('stream-events'); var through = require('through2'); @@ -182,9 +182,9 @@ streamRouter.runAsStream_ = function(parsedArguments, originalMethod) { var stream = streamEvents(through.obj()); - function shouldPushResult() { - return resultsToSend !== 0 && !isStreamEnded(stream); - } + stream.once('reading', function() { + originalMethod(query, onResultSet); + }); function onResultSet(err, results, nextQuery) { if (err) { @@ -193,32 +193,27 @@ streamRouter.runAsStream_ = function(parsedArguments, originalMethod) { return; } - var result; - while ((result = results.shift()) && shouldPushResult()) { - stream.push(result); - resultsToSend--; + if (resultsToSend >= 0 && results.length > resultsToSend) { + results = results.splice(0, resultsToSend); } - if (isStreamEnded(stream)) { - return; - } + resultsToSend -= results.length; - if (resultsToSend === 0) { - stream.end(); - return; - } + split(results, stream, function(streamEnded) { + if (streamEnded) { + return; + } - if (nextQuery) { - originalMethod(nextQuery, onResultSet); - } else { + if (nextQuery && resultsToSend !== 0) { + originalMethod(nextQuery, onResultSet); + return; + } + + stream.push(null); stream.end(); - } + }); } - stream.once('reading', function() { - originalMethod(query, onResultSet); - }); - return stream; }; diff --git a/lib/datastore/request.js b/lib/datastore/request.js index c850178a6f7..4e2bc278802 100644 --- a/lib/datastore/request.js +++ b/lib/datastore/request.js @@ -20,12 +20,13 @@ 'use strict'; -var isStreamEnded = require('is-stream-ended'); +var concat = require('concat-stream'); var request = require('request').defaults({ pool: { maxSockets: Infinity } }); +var split = require('split-array-stream'); var through = require('through2'); /** @@ -101,7 +102,6 @@ function DatastoreRequest() {} * @param {?error} callback.err - An error returned while making this request * @param {module:datastore/entity|module:datastore/entity[]} callback.entity - * Will return either a single Entity or a list of Entities. - * @param {object} callback.apiResponse - The full API response. * * @example * //- @@ -114,7 +114,7 @@ function DatastoreRequest() {} * //- * var key = dataset.key(['Company', 123]); * - * transaction.get(key, function(err, entity, apiResponse) {}); + * transaction.get(key, function(err, entity) {}); * * //- * // Get multiple entities at once with a callback. @@ -124,13 +124,13 @@ function DatastoreRequest() {} * dataset.key(['Product', 'Computer']) * ]; * - * transaction.get(keys, function(err, entities, apiResponse) {}); + * transaction.get(keys, function(err, entities) {}); * * //- * // Or, get the entities as a readable object stream. * //- * transaction.get(keys) - * .on('error', function(err, apiResponse) {}) + * .on('error', function(err) {}) * .on('data', function(entity) { * // entity is an entity object. * }) @@ -139,72 +139,54 @@ function DatastoreRequest() {} * }); */ DatastoreRequest.prototype.get = function(keys, callback) { - var self = this; - - var isStreamMode = !callback; - var stream; - - if (isStreamMode) { - stream = through.obj(); + if (util.is(callback, 'function')) { + // Run this method in stream mode and send the results back to the callback. + this.get(keys) + .on('error', callback) + .pipe(concat(function(results) { + var isSingleLookup = !util.is(keys, 'array'); + callback(null, isSingleLookup ? results[0] : results); + })); + return; } - var isSingleLookup = !util.is(keys, 'array'); keys = util.arrayize(keys).map(entity.keyToKeyProto); if (keys.length === 0) { throw new Error('At least one Key object is required.'); } - var request = { - key: keys - }; - - var entities = []; - this.makeReq_('lookup', request, onApiResponse); + var self = this; + var stream = through.obj(); function onApiResponse(err, resp) { if (err) { - if (isStreamMode) { - stream.emit('error', err, resp); - stream.end(); - } else { - callback(err, null, resp); - } + stream.emit('error', err); + stream.end(); return; } - var results = entity.formatArray(resp.found); + var entities = entity.formatArray(resp.found); var nextKeys = (resp.deferred || []).map(entity.keyFromKeyProto); - if (isStreamMode) { - var result; - while ((result = results.shift()) && !isStreamEnded(stream)) { - stream.push(result); + split(entities, stream, function(streamEnded) { + if (streamEnded) { + return; } - } else { - entities = entities.concat(results); - } - - if (isStreamMode && isStreamEnded(stream)) { - return; - } - if (nextKeys.length > 0) { - self.get(nextKeys, onApiResponse); - return; - } + if (nextKeys.length > 0) { + self.get(nextKeys, onApiResponse); + return; + } - if (isStreamMode) { stream.push(null); stream.end(); - } else { - callback(null, isSingleLookup ? entities[0] : entities, resp); - } + }); } - if (isStreamMode) { - return stream; - } + this.makeReq_('lookup', { key: keys }, onApiResponse); + + return stream; }; /** diff --git a/package.json b/package.json index 653f938c32f..f415f6f00b0 100644 --- a/package.json +++ b/package.json @@ -56,13 +56,13 @@ "duplexify": "^3.2.0", "extend": "^2.0.0", "google-auth-library": "^0.9.4", - "is-stream-ended": "^0.1.0", "mime-types": "^2.0.8", "node-uuid": "^1.4.2", "once": "^1.3.1", "protobufjs": "^3.8.2", "request": "^2.53.0", "retry-request": "^1.1.0", + "split-array-stream": "^1.0.0", "sse4_crc32": "^3.1.0", "stream-events": "^1.0.1", "stream-forward": "^2.0.0", diff --git a/test/datastore/request.js b/test/datastore/request.js index 9a957099a01..5624d13b0b6 100644 --- a/test/datastore/request.js +++ b/test/datastore/request.js @@ -22,7 +22,6 @@ var assert = require('assert'); var ByteBuffer = require('bytebuffer'); var entity = require('../../lib/datastore/entity.js'); var extend = require('extend'); -var isStreamEnded = require('is-stream-ended'); var mockery = require('mockery'); var mockRespGet = require('../testdata/response_get.json'); var pb = require('../../lib/datastore/pb.js'); @@ -181,23 +180,19 @@ describe('Request', function() { }); describe('callback mode', function() { - it('should execute callback with error & API response', function(done) { - request.get(key, function(err, entity, apiResponse_) { + it('should execute callback with error', function(done) { + request.get(key, function(err) { assert.strictEqual(err, error); - assert.strictEqual(entity, null); - assert.strictEqual(apiResponse_, apiResponse); done(); }); }); }); describe('stream mode', function() { - it('should emit error & API response', function(done) { + it('should emit error', function(done) { request.get(key) - .on('error', function(err, apiResponse_) { + .on('error', function(err) { assert.strictEqual(err, error); - assert.strictEqual(apiResponse_, apiResponse); - done(); }); }); @@ -207,7 +202,7 @@ describe('Request', function() { stream.on('error', function() { setImmediate(function() { - assert.strictEqual(isStreamEnded(stream), true); + assert.strictEqual(stream._readableState.ended, true); done(); }); }); @@ -238,39 +233,34 @@ describe('Request', function() { it('should format the results', function(done) { entityOverrides.formatArray = function(arr) { assert.strictEqual(arr, apiResponse.found); - done(); + setImmediate(done); + return arr; }; request.get(key, assert.ifError); }); it('should continue looking for deferred results', function(done) { - var lookupCount = 0; - request.makeReq_ = function(method, req, callback) { - lookupCount++; - - if (lookupCount === 1) { - callback(null, apiResponseWithDeferred); - return; - } - - if (lookupCount > 1) { - done(); - } + callback(null, apiResponseWithDeferred); }; request.get(key, assert.ifError); + + request.get = function(keys) { + var expectedKeys = apiResponseWithDeferred.deferred + .map(entity.keyFromKeyProto); + + assert.deepEqual(keys, expectedKeys); + done(); + }; }); describe('callback mode', function() { - it('should exec callback with results & API response', function(done) { - request.get(key, function(err, entity, apiResponse_) { + it('should exec callback with results', function(done) { + request.get(key, function(err, entity) { assert.ifError(err); - assert.deepEqual(entity, expectedResult); - assert.strictEqual(apiResponse_, apiResponse); - done(); }); }); @@ -280,13 +270,12 @@ describe('Request', function() { callback(null, apiResponseWithMultiEntities); }; - request.get([key, key], function(err, entities, apiResponse) { + request.get([key, key], function(err, entities) { assert.ifError(err); assert.strictEqual(util.is(entities, 'array'), true); assert.deepEqual(entities, expectedResults); - assert.strictEqual(apiResponse, apiResponseWithMultiEntities); done(); }); });