Skip to content

Commit

Permalink
core: refactor util.makeWritableStream
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Aug 28, 2015
1 parent 0602fd4 commit 081aaa2
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 155 deletions.
70 changes: 19 additions & 51 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ var request = require('request').defaults({
var retryRequest = require('retry-request');
var streamForward = require('stream-forward');
var through = require('through2');
var uuid = require('node-uuid');

/** @const {object} gcloud-node's package.json file. */
var PKG = require('../../package.json');
Expand Down Expand Up @@ -214,79 +213,50 @@ util.parseApiResp = parseApiResp;
*/
function makeWritableStream(dup, options, onComplete) {
onComplete = onComplete || noop;

options = options || {};
options.metadata = options.metadata || {};

var boundary = uuid.v4();
var writeStream = through();
dup.setWritable(writeStream);

var defaults = {
var defaultReqOpts = {
method: 'POST',
qs: {
uploadType: 'multipart'
}
};

// Extend the provided request object with common defaults.
//
// `options.request` takes precedence over the defaults, but not over the
// headers, as these set up the boundary.
var reqOpts = extend(true, defaults, options.request, {
headers: {
'Content-Type': 'multipart/related; boundary="' + boundary + '"'
}
var metadata = options.metadata || {};

var reqOpts = extend(true, defaultReqOpts, options.request, {
multipart: [
{
'Content-Type': 'application/json',
body: JSON.stringify(metadata)
},
{
'Content-Type': metadata.contentType || 'application/octet-stream',
body: writeStream
}
]
});

// With the provided connection, be sure we have a valid token before
// attempting to create a request.
options.makeAuthorizedRequest(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
dup.destroy(err);
return;
}

var streamType =
options.metadata.contentType || 'application/octet-stream';

var stream = request(authorizedReqOpts);
stream.callback = noop;

// Write the metadata to the request.
stream.write('--' + boundary + '\n');
stream.write('Content-Type: application/json\n\n');
stream.write(JSON.stringify(options.metadata));
stream.write('\n\n');
stream.write('--' + boundary + '\n');
stream.write('Content-Type: ' + streamType + '\n\n');

// Overwrite the `end` function, so we can close the boundary.
var oldEndFn = stream.end;
stream.end = function(data, encoding, callback) {
data = (data || '') + '\n--' + boundary + '--\n';
stream.write(data, encoding, callback);
oldEndFn.apply(this);
};

// When the request is complete, parse it. If everything went well, pass
// the parsed response data to the callback handler.
stream.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, data) {
request(authorizedReqOpts, function(err, resp, body) {
util.handleResp(err, resp, body, function(err, data) {
if (err) {
dup.destroy(err);
return;
}

onComplete(data);
});
});

// We have a writable stream - tell Duplexify about it, so it can resume
// processing incoming data.
dup.setWritable(stream);

// Keep part of the stream open to keep Request from closing the
// connection. Reference: http://goo.gl/zZVSif.
dup.pipe(stream);
}
});
}
Expand Down Expand Up @@ -332,8 +302,6 @@ util.shouldRetryRequest = shouldRetryRequest;
* response is related to rate limits or certain intermittent server errors.
* We will exponentially backoff subsequent requests by default. (default:
* true)
* @param {object=} config.authClient - AuthClient object. If not provided,
* it will be created and cached here.
* @param {object=} config.credentials - Credentials object.
* @param {boolean=} config.customEndpoint - If true, just return the provided
* request options. Default: false.
Expand Down
13 changes: 6 additions & 7 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -1588,13 +1588,13 @@ File.prototype.startResumableUpload_ = function(stream, metadata) {
* `startResumableUpload_`, which uses the resumable upload technique, this
* method uses a simple upload (all or nothing).
*
* @param {Duplexify} stream - Duplexify stream of data to pipe to the file.
* @param {Duplexify} dup - Duplexify stream of data to pipe to the file.
* @param {object=} metadata - Optional metadata to set on the file.
*
* @private
*/
File.prototype.startSimpleUpload_ = function(stream, metadata) {
var that = this;
File.prototype.startSimpleUpload_ = function(dup, metadata) {
var self = this;

var reqOpts = {
qs: {
Expand All @@ -1610,14 +1610,13 @@ File.prototype.startSimpleUpload_ = function(stream, metadata) {
reqOpts.qs.ifGenerationMatch = this.generation;
}

util.makeWritableStream(stream, {
util.makeWritableStream(dup, {
makeAuthorizedRequest: that.bucket.storage.makeAuthorizedRequest_,
metadata: metadata,
request: reqOpts
}, function(data) {
that.metadata = data;

stream.emit('complete', data);
self.metadata = data;
dup.emit('complete', data);
});
};

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
"is": "^3.0.1",
"methmeth": "^1.0.0",
"mime-types": "^2.0.8",
"node-uuid": "^1.4.2",
"once": "^1.3.1",
"prop-assign": "^1.0.0",
"propprop": "^0.3.0",
Expand All @@ -87,6 +86,7 @@
"mitm": "^1.1.0",
"mocha": "^2.1.0",
"mockery": "^1.4.0",
"node-uuid": "^1.4.3",
"tmp": "0.0.24"
},
"scripts": {
Expand Down
130 changes: 34 additions & 96 deletions test/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ var request = require('request');
var retryRequest = require('retry-request');
var stream = require('stream');
var streamForward = require('stream-forward');
var through = require('through2');

var googleAutoAuthOverride;
function fakeGoogleAutoAuth() {
Expand Down Expand Up @@ -268,13 +267,25 @@ describe('common/util', function() {
describe('makeWritableStream', function() {
it('should use defaults', function(done) {
var dup = duplexify();
var metadata = { a: 'b', c: 'd' };

util.makeWritableStream(dup, {
metadata: metadata,
makeAuthorizedRequest: function(request) {
assert.equal(request.method, 'POST');
assert.equal(request.qs.uploadType, 'multipart');

var contentType = request.headers['Content-Type'];
assert.equal(contentType.indexOf('multipart/related'), 0);
assert.strictEqual(Array.isArray(request.multipart), true);

var mp = request.multipart;

assert.strictEqual(mp[0]['Content-Type'], 'application/json');
assert.strictEqual(mp[0].body, JSON.stringify(metadata));

assert.strictEqual(mp[1]['Content-Type'], 'application/octet-stream');
// (is a writable stream:)
assert.strictEqual(typeof mp[1].body._writableState, 'object');

done();
}
});
Expand All @@ -292,10 +303,17 @@ describe('common/util', function() {
};

util.makeWritableStream(dup, {
metadata: {
contentType: 'application/json'
},
makeAuthorizedRequest: function(request) {
assert.equal(request.method, req.method);
assert.deepEqual(request.qs, req.qs);
assert.equal(request.something, req.something);

var mp = request.multipart;
assert.strictEqual(mp[1]['Content-Type'], 'application/json');

done();
},

Expand All @@ -306,7 +324,7 @@ describe('common/util', function() {
it('should emit an error', function(done) {
var error = new Error('Error.');

var ws = through();
var ws = duplexify();
ws.on('error', function(err) {
assert.equal(err, error);
done();
Expand All @@ -319,93 +337,15 @@ describe('common/util', function() {
});
});

it('should write request', function(done) {
var dup = duplexify();
var boundary;
var metadata = { a: 'b', c: 'd' };

requestOverride = function() {
var written = [];

var req = duplexify();

req.write = function(data) {
written.push(data);
};

req.end = function() {
var boundaryLine = '--' + boundary + '\n';

var startFirstBoundaryIdx = written.indexOf(boundaryLine);
var endFirstBoundaryIdx = written.lastIndexOf(boundaryLine);
var endBoundaryIdx = written.indexOf('\n--' + boundary + '--\n');

assert(startFirstBoundaryIdx > -1);
assert(endFirstBoundaryIdx > startFirstBoundaryIdx);
assert(endBoundaryIdx > -1);

assert(written.indexOf(JSON.stringify(metadata)) > -1);

done();
};

setImmediate(function() {
req.end();
});

return req;
};

util.makeWritableStream(dup, {
metadata: metadata,

makeAuthorizedRequest: function(request, opts) {
var contentType = request.headers['Content-Type'];
boundary = contentType.match(/boundary="([^"]*)/)[1];
opts.onAuthorized();
}
});
});

it('should set the writable stream', function(done) {
var dup = duplexify();
var ws = new stream.Writable();
ws.write = function() {};

requestOverride = function() {
return ws;
};

dup.setWritable = function(writable) {
assert.equal(writable, ws);
dup.setWritable = function() {
done();
};

util.makeWritableStream(dup, {
makeAuthorizedRequest: function(request, opts) {
opts.onAuthorized();
}
});
});

it('should keep the pipe open on the stream', function(done) {
var dup = duplexify();
var ws = new stream.Writable();
ws.write = function() {};

requestOverride = function() {
return ws;
};

dup.pipe = function(writable) {
assert.equal(writable, ws);
done();
};

util.makeWritableStream(dup, {
makeAuthorizedRequest: function(request, opts) {
opts.onAuthorized();
}
makeAuthorizedRequest: function() {}
});
});

Expand All @@ -421,23 +361,21 @@ describe('common/util', function() {
callback(error);
};

requestOverride = function() {
return fakeStream;
};

var options = {
makeAuthorizedRequest: function(request, opts) {
opts.onAuthorized();
}
requestOverride = function(reqOpts, callback) {
callback(error);
};

util.makeWritableStream(dup, options);

dup.on('error', function(err) {
assert.strictEqual(err, error);
done();
});

util.makeWritableStream(dup, {
makeAuthorizedRequest: function(request, opts) {
opts.onAuthorized();
}
});

setImmediate(function() {
fakeStream.emit('complete', {});
});
Expand All @@ -454,8 +392,8 @@ describe('common/util', function() {
callback(null, fakeResponse);
};

requestOverride = function() {
return fakeStream;
requestOverride = function(reqOpts, callback) {
callback();
};

var options = {
Expand Down

0 comments on commit 081aaa2

Please sign in to comment.