Skip to content

Commit

Permalink
core: use split-array-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Aug 1, 2015
1 parent a5e330c commit 30c828f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 102 deletions.
41 changes: 18 additions & 23 deletions lib/common/stream-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
'use strict';

var concat = require('concat-stream');
var isStreamEnded = require('is-stream-ended');
var split = require('split-array-stream');
var streamEvents = require('stream-events');
var through = require('through2');

Expand Down Expand Up @@ -182,9 +182,9 @@ streamRouter.runAsStream_ = function(parsedArguments, originalMethod) {

var stream = streamEvents(through.obj());

function shouldPushResult() {
return resultsToSend !== 0 && !isStreamEnded(stream);
}
stream.once('reading', function() {
originalMethod(query, onResultSet);
});

function onResultSet(err, results, nextQuery) {
if (err) {
Expand All @@ -193,32 +193,27 @@ streamRouter.runAsStream_ = function(parsedArguments, originalMethod) {
return;
}

var result;
while ((result = results.shift()) && shouldPushResult()) {
stream.push(result);
resultsToSend--;
if (resultsToSend >= 0 && results.length > resultsToSend) {
results = results.splice(0, resultsToSend);
}

if (isStreamEnded(stream)) {
return;
}
resultsToSend -= results.length;

if (resultsToSend === 0) {
stream.end();
return;
}
split(results, stream, function(streamEnded) {
if (streamEnded) {
return;
}

if (nextQuery) {
originalMethod(nextQuery, onResultSet);
} else {
if (nextQuery && resultsToSend !== 0) {
originalMethod(nextQuery, onResultSet);
return;
}

stream.push(null);
stream.end();
}
});
}

stream.once('reading', function() {
originalMethod(query, onResultSet);
});

return stream;
};

Expand Down
78 changes: 30 additions & 48 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

'use strict';

var isStreamEnded = require('is-stream-ended');
var concat = require('concat-stream');
var request = require('request').defaults({
pool: {
maxSockets: Infinity
}
});
var split = require('split-array-stream');
var through = require('through2');

/**
Expand Down Expand Up @@ -101,7 +102,6 @@ function DatastoreRequest() {}
* @param {?error} callback.err - An error returned while making this request
* @param {module:datastore/entity|module:datastore/entity[]} callback.entity -
* Will return either a single Entity or a list of Entities.
* @param {object} callback.apiResponse - The full API response.
*
* @example
* //-
Expand All @@ -114,7 +114,7 @@ function DatastoreRequest() {}
* //-
* var key = dataset.key(['Company', 123]);
*
* transaction.get(key, function(err, entity, apiResponse) {});
* transaction.get(key, function(err, entity) {});
*
* //-
* // Get multiple entities at once with a callback.
Expand All @@ -124,13 +124,13 @@ function DatastoreRequest() {}
* dataset.key(['Product', 'Computer'])
* ];
*
* transaction.get(keys, function(err, entities, apiResponse) {});
* transaction.get(keys, function(err, entities) {});
*
* //-
* // Or, get the entities as a readable object stream.
* //-
* transaction.get(keys)
* .on('error', function(err, apiResponse) {})
* .on('error', function(err) {})
* .on('data', function(entity) {
* // entity is an entity object.
* })
Expand All @@ -139,72 +139,54 @@ function DatastoreRequest() {}
* });
*/
DatastoreRequest.prototype.get = function(keys, callback) {
var self = this;

var isStreamMode = !callback;
var stream;

if (isStreamMode) {
stream = through.obj();
if (util.is(callback, 'function')) {
// Run this method in stream mode and send the results back to the callback.
this.get(keys)
.on('error', callback)
.pipe(concat(function(results) {
var isSingleLookup = !util.is(keys, 'array');
callback(null, isSingleLookup ? results[0] : results);
}));
return;
}

var isSingleLookup = !util.is(keys, 'array');
keys = util.arrayize(keys).map(entity.keyToKeyProto);

if (keys.length === 0) {
throw new Error('At least one Key object is required.');
}

var request = {
key: keys
};

var entities = [];
this.makeReq_('lookup', request, onApiResponse);
var self = this;
var stream = through.obj();

function onApiResponse(err, resp) {
if (err) {
if (isStreamMode) {
stream.emit('error', err, resp);
stream.end();
} else {
callback(err, null, resp);
}
stream.emit('error', err);
stream.end();
return;
}

var results = entity.formatArray(resp.found);
var entities = entity.formatArray(resp.found);
var nextKeys = (resp.deferred || []).map(entity.keyFromKeyProto);

if (isStreamMode) {
var result;
while ((result = results.shift()) && !isStreamEnded(stream)) {
stream.push(result);
split(entities, stream, function(streamEnded) {
if (streamEnded) {
return;
}
} else {
entities = entities.concat(results);
}

if (isStreamMode && isStreamEnded(stream)) {
return;
}

if (nextKeys.length > 0) {
self.get(nextKeys, onApiResponse);
return;
}
if (nextKeys.length > 0) {
self.get(nextKeys, onApiResponse);
return;
}

if (isStreamMode) {
stream.push(null);
stream.end();
} else {
callback(null, isSingleLookup ? entities[0] : entities, resp);
}
});
}

if (isStreamMode) {
return stream;
}
this.makeReq_('lookup', { key: keys }, onApiResponse);

return stream;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@
"duplexify": "^3.2.0",
"extend": "^2.0.0",
"google-auth-library": "^0.9.4",
"is-stream-ended": "^0.1.0",
"mime-types": "^2.0.8",
"node-uuid": "^1.4.2",
"once": "^1.3.1",
"protobufjs": "^3.8.2",
"request": "^2.53.0",
"retry-request": "^1.1.0",
"split-array-stream": "^1.0.0",
"sse4_crc32": "^3.1.0",
"stream-events": "^1.0.1",
"stream-forward": "^2.0.0",
Expand Down
49 changes: 19 additions & 30 deletions test/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ var assert = require('assert');
var ByteBuffer = require('bytebuffer');
var entity = require('../../lib/datastore/entity.js');
var extend = require('extend');
var isStreamEnded = require('is-stream-ended');
var mockery = require('mockery');
var mockRespGet = require('../testdata/response_get.json');
var pb = require('../../lib/datastore/pb.js');
Expand Down Expand Up @@ -181,23 +180,19 @@ describe('Request', function() {
});

describe('callback mode', function() {
it('should execute callback with error & API response', function(done) {
request.get(key, function(err, entity, apiResponse_) {
it('should execute callback with error', function(done) {
request.get(key, function(err) {
assert.strictEqual(err, error);
assert.strictEqual(entity, null);
assert.strictEqual(apiResponse_, apiResponse);
done();
});
});
});

describe('stream mode', function() {
it('should emit error & API response', function(done) {
it('should emit error', function(done) {
request.get(key)
.on('error', function(err, apiResponse_) {
.on('error', function(err) {
assert.strictEqual(err, error);
assert.strictEqual(apiResponse_, apiResponse);

done();
});
});
Expand All @@ -207,7 +202,7 @@ describe('Request', function() {

stream.on('error', function() {
setImmediate(function() {
assert.strictEqual(isStreamEnded(stream), true);
assert.strictEqual(stream._readableState.ended, true);
done();
});
});
Expand Down Expand Up @@ -238,39 +233,34 @@ describe('Request', function() {
it('should format the results', function(done) {
entityOverrides.formatArray = function(arr) {
assert.strictEqual(arr, apiResponse.found);
done();
setImmediate(done);
return arr;
};

request.get(key, assert.ifError);
});

it('should continue looking for deferred results', function(done) {
var lookupCount = 0;

request.makeReq_ = function(method, req, callback) {
lookupCount++;

if (lookupCount === 1) {
callback(null, apiResponseWithDeferred);
return;
}

if (lookupCount > 1) {
done();
}
callback(null, apiResponseWithDeferred);
};

request.get(key, assert.ifError);

request.get = function(keys) {
var expectedKeys = apiResponseWithDeferred.deferred
.map(entity.keyFromKeyProto);

assert.deepEqual(keys, expectedKeys);
done();
};
});

describe('callback mode', function() {
it('should exec callback with results & API response', function(done) {
request.get(key, function(err, entity, apiResponse_) {
it('should exec callback with results', function(done) {
request.get(key, function(err, entity) {
assert.ifError(err);

assert.deepEqual(entity, expectedResult);
assert.strictEqual(apiResponse_, apiResponse);

done();
});
});
Expand All @@ -280,13 +270,12 @@ describe('Request', function() {
callback(null, apiResponseWithMultiEntities);
};

request.get([key, key], function(err, entities, apiResponse) {
request.get([key, key], function(err, entities) {
assert.ifError(err);

assert.strictEqual(util.is(entities, 'array'), true);
assert.deepEqual(entities, expectedResults);

assert.strictEqual(apiResponse, apiResponseWithMultiEntities);
done();
});
});
Expand Down

0 comments on commit 30c828f

Please sign in to comment.