diff --git a/lib/common/util.js b/lib/common/util.js index 3853a641c89..dc19c7ccb01 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -249,6 +249,10 @@ module.exports.toArray = toArray; function makeWritableStream(dup, options, onComplete) { onComplete = onComplete || noop; + options = options || {}; + options.metadata = options.metadata || {}; + onComplete = onComplete || noop; + var boundary = uuid.v4(); var defaults = { @@ -276,7 +280,7 @@ function makeWritableStream(dup, options, onComplete) { return; } - var streamType = options.streamContentType || 'application/octet-stream'; + var streamType = options.metadata.contentType || 'application/octet-stream'; var stream = options.connection.requester(req); stream.callback = noop; @@ -314,7 +318,7 @@ function makeWritableStream(dup, options, onComplete) { // processing incoming data. dup.setWritable(stream); - // Keep part of the stream open to keep Request from closing the conneciton. + // Keep part of the stream open to keep Request from closing the connection. // Reference: http://goo.gl/zZVSif. dup.pipe(stream); }); diff --git a/lib/storage/file.js b/lib/storage/file.js index 3c62d8a3b29..8aff3844837 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -22,8 +22,7 @@ var crypto = require('crypto'); var duplexify = require('duplexify'); -var extend = require('extend'); -var uuid = require('node-uuid'); +var streamEvents = require('stream-events'); /** * @type module:common/util @@ -259,25 +258,29 @@ File.prototype.createReadStream = function() { */ File.prototype.createWriteStream = function(metadata) { var that = this; - var dup = duplexify(); - this.getWritableStream_(metadata, function(err, writable) { - if (err) { - dup.emit('error', err); - return; - } - writable.on('complete', function(res) { - util.handleResp(null, res, res.body, function(err, data) { - if (err) { - dup.emit('error', err); - return; - } - that.metadata = data; - dup.emit('complete', data); - }); + var dup = streamEvents(duplexify()); + + dup.once('writing', function() { + util.makeWritableStream(dup, { + connection: that.bucket.connection_, + metadata: metadata, + request: { + qs: { + name: that.name, + }, + uri: util.format('{base}/{bucket}/o', { + base: STORAGE_UPLOAD_BASE_URL, + bucket: that.bucket.name + }) + } + }, function(data) { + that.metadata = data; + + dup.emit('complete', data); + dup.end(); }); - dup.setWritable(writable); - dup.pipe(writable); }); + return dup; }; @@ -411,70 +414,4 @@ File.prototype.setMetadata = function(metadata, callback) { }.bind(this)); }; -/*! Developer Documentation - * - * Private Methods - * - * These methods deal with creating and maintaining the lifecycle of a stream. - * All File objects are Duplex streams, which will allow a reader to pipe data - * to the remote endpoint. Likewise, you can pipe data from a remote endpoint to - * a writer. - * - * Duplexify is used to allow us to asynchronously set the readable and writable - * portions of this stream. We can't accept data for buffering until we have - * made an authorized connection. Once we have such a connection, we call - * `setReadable` and/or `setWritable` on the File instance (which is also a - * Duplexify instance), which then opens the pipe for more data to come in or go - * out. - */ - -/** - * Get a remote stream to begin piping a readable stream to. - * - * @private - */ -File.prototype.getWritableStream_ = function(metadata, callback) { - if (!callback) { - callback = metadata; - metadata = {}; - } - var that = this; - var boundary = uuid.v4(); - metadata = extend({ contentType: 'text/plain' }, metadata); - this.bucket.connection_.createAuthorizedReq({ - method: 'POST', - uri: util.format('{base}/{bucket}/o', { - base: STORAGE_UPLOAD_BASE_URL, - bucket: that.bucket.name - }), - qs: { - name: this.name, - uploadType: 'multipart' - }, - headers: { - 'Content-Type': 'multipart/related; boundary="' + boundary + '"' - } - }, function(err, req) { - if (err) { - callback(err); - return; - } - var remoteStream = that.bucket.connection_.requester(req); - remoteStream.callback = util.noop; - remoteStream.write('--' + boundary + '\n'); - remoteStream.write('Content-Type: application/json\n\n'); - remoteStream.write(JSON.stringify(metadata)); - remoteStream.write('\n\n'); - remoteStream.write('--' + boundary + '\n'); - remoteStream.write('Content-Type: ' + metadata.contentType + '\n\n'); - var oldEndFn = remoteStream.end; - remoteStream.end = function(data, encoding, callback) { - data = (data || '') + '\n--' + boundary + '--\n'; - remoteStream.write(data, encoding, callback); - oldEndFn.apply(this); - }; - callback(null, remoteStream); - }); -}; - module.exports = File; diff --git a/test/bigquery/table.js b/test/bigquery/table.js index a6ba3cfdc57..bae5e5c0faa 100644 --- a/test/bigquery/table.js +++ b/test/bigquery/table.js @@ -31,7 +31,7 @@ function FakeFile(a, b) { } var makeWritableStream_Override; -var fakeUtil = extend(util, { +var fakeUtil = extend({}, util, { makeWritableStream: function() { var args = [].slice.call(arguments); (makeWritableStream_Override || util.makeWritableStream).apply(null, args); diff --git a/test/common/util.js b/test/common/util.js index c56494ab780..14874a415fe 100644 --- a/test/common/util.js +++ b/test/common/util.js @@ -19,74 +19,233 @@ 'use strict'; var assert = require('assert'); +var duplexify = require('duplexify'); var util = require('../../lib/common/util.js'); -describe('arrayize', function() { - it('should arrayize if the input is not an array', function(done) { - var o = util.arrayize('text'); - assert.deepEqual(o, ['text']); - done(); +describe('common/util', function() { + describe('arrayize', function() { + it('should arrayize if the input is not an array', function(done) { + var o = util.arrayize('text'); + assert.deepEqual(o, ['text']); + done(); + }); }); -}); -describe('extendGlobalConfig', function() { - it('should favor `keyFilename` when `credentials` is global', function() { - var globalConfig = { credentials: {} }; - var options = util.extendGlobalConfig(globalConfig, { - keyFilename: 'key.json' + describe('extendGlobalConfig', function() { + it('should favor `keyFilename` when `credentials` is global', function() { + var globalConfig = { credentials: {} }; + var options = util.extendGlobalConfig(globalConfig, { + keyFilename: 'key.json' + }); + assert.deepEqual(options, { keyFilename: 'key.json' }); }); - assert.deepEqual(options, { keyFilename: 'key.json' }); - }); - it('should favor `credentials` when `keyFilename` is global', function() { - var globalConfig = { keyFilename: 'key.json' }; - var options = util.extendGlobalConfig(globalConfig, { credentials: {} }); - assert.deepEqual(options, { credentials: {} }); - }); + it('should favor `credentials` when `keyFilename` is global', function() { + var globalConfig = { keyFilename: 'key.json' }; + var options = util.extendGlobalConfig(globalConfig, { credentials: {} }); + assert.deepEqual(options, { credentials: {} }); + }); - it('should not modify original object', function() { - var globalConfig = { keyFilename: 'key.json' }; - util.extendGlobalConfig(globalConfig, { credentials: {} }); - assert.deepEqual(globalConfig, { keyFilename: 'key.json' }); + it('should not modify original object', function() { + var globalConfig = { keyFilename: 'key.json' }; + util.extendGlobalConfig(globalConfig, { credentials: {} }); + assert.deepEqual(globalConfig, { keyFilename: 'key.json' }); + }); }); -}); -describe('handleResp', function() { - it('should handle errors', function(done) { - var defaultErr = new Error('new error'); - util.handleResp(defaultErr, null, null, function(err) { - assert.equal(err, defaultErr); - done(); + describe('handleResp', function() { + it('should handle errors', function(done) { + var defaultErr = new Error('new error'); + util.handleResp(defaultErr, null, null, function(err) { + assert.equal(err, defaultErr); + done(); + }); }); - }); - it('should handle body errors', function(done) { - var apiErr = { - errors: [{ foo: 'bar' }], - code: 400, - message: 'an error occurred' - }; - util.handleResp(null, {}, { error: apiErr }, function(err) { - assert.deepEqual(err.errors, apiErr.errors); - assert.strictEqual(err.code, apiErr.code); - assert.deepEqual(err.message, apiErr.message); - done(); + it('should handle body errors', function(done) { + var apiErr = { + errors: [{ foo: 'bar' }], + code: 400, + message: 'an error occurred' + }; + util.handleResp(null, {}, { error: apiErr }, function(err) { + assert.deepEqual(err.errors, apiErr.errors); + assert.strictEqual(err.code, apiErr.code); + assert.deepEqual(err.message, apiErr.message); + done(); + }); }); - }); - it('should try to parse JSON if body is string', function(done) { - var body = '{ "foo": "bar" }'; - util.handleResp(null, {}, body, function(err, body) { - assert.strictEqual(body.foo, 'bar'); - done(); + it('should try to parse JSON if body is string', function(done) { + var body = '{ "foo": "bar" }'; + util.handleResp(null, {}, body, function(err, body) { + assert.strictEqual(body.foo, 'bar'); + done(); + }); + }); + + it('should return err code if there are not other errors', function(done) { + util.handleResp(null, { statusCode: 400 }, null, function(err) { + assert.strictEqual(err.code, 400); + assert.strictEqual(err.message, 'Error during request.'); + done(); + }); }); }); - it('should return error code if there are not other errors', function(done) { - util.handleResp(null, { statusCode: 400 }, null, function(err) { - assert.strictEqual(err.code, 400); - assert.strictEqual(err.message, 'Error during request.'); - done(); + describe('makeWritableStream', function() { + it('should use defaults', function(done) { + var dup = duplexify(); + util.makeWritableStream(dup, { + connection: { + createAuthorizedReq: function(request) { + assert.equal(request.method, 'POST'); + assert.equal(request.qs.uploadType, 'multipart'); + + var contentType = request.headers['Content-Type']; + assert.equal(contentType.indexOf('multipart/related'), 0); + done(); + } + } + }); + }); + + it('should allow overriding defaults', function(done) { + var dup = duplexify(); + + var req = { + method: 'PUT', + qs: { + uploadType: 'media' + }, + something: 'else' + }; + + util.makeWritableStream(dup, { + connection: { + createAuthorizedReq: function(request) { + assert.equal(request.method, req.method); + assert.deepEqual(request.qs, req.qs); + assert.equal(request.something, req.something); + done(); + } + }, + + request: req + }); + }); + + it('should emit an error', function(done) { + var error = new Error('Error.'); + + var dup = duplexify(); + dup.on('error', function(err) { + assert.equal(err, error); + done(); + }); + + util.makeWritableStream(dup, { + connection: { + createAuthorizedReq: function(request, callback) { + callback(error); + } + } + }); + }); + + it('should write request', function(done) { + var dup = duplexify(); + var boundary; + var metadata = { a: 'b', c: 'd' }; + + util.makeWritableStream(dup, { + metadata: metadata, + + connection: { + createAuthorizedReq: function(request, callback) { + var contentType = request.headers['Content-Type']; + // Match the UUID boundary from the contentType + boundary = contentType.match(/boundary="([^"]*)/)[1]; + callback(); + }, + + requester: function() { + var written = []; + + var req = duplexify(); + + req.write = function(data) { + written.push(data); + }; + + req.end = function() { + var boundaryLine = '--' + boundary + '\n'; + + var startFirstBoundaryIdx = written.indexOf(boundaryLine); + var endFirstBoundaryIdx = written.lastIndexOf(boundaryLine); + var endBoundaryIdx = written.indexOf('\n--' + boundary + '--\n'); + + assert(startFirstBoundaryIdx > -1); + assert(endFirstBoundaryIdx > startFirstBoundaryIdx); + assert(endBoundaryIdx > -1); + + assert(written.indexOf(JSON.stringify(metadata)) > -1); + + done(); + }; + + setImmediate(function() { + req.end(); + }); + + return req; + } + } + }); + }); + + it('should set the writable stream', function(done) { + var dup = duplexify(); + var stream = duplexify(); + + dup.setWritable = function(writable) { + assert.equal(writable, stream); + done(); + }; + + util.makeWritableStream(dup, { + connection: { + createAuthorizedReq: function(request, callback) { + callback(); + }, + + requester: function() { + return stream; + } + } + }); + }); + + it('should keep the pipe open on the stream', function(done) { + var dup = duplexify(); + var stream = duplexify(); + + dup.pipe = function(writable) { + assert.equal(writable, stream); + done(); + }; + + util.makeWritableStream(dup, { + connection: { + createAuthorizedReq: function(request, callback) { + callback(); + }, + + requester: function() { + return stream; + } + } + }); }); }); }); diff --git a/test/storage/file.js b/test/storage/file.js index 30a6c249187..8a7eb356eea 100644 --- a/test/storage/file.js +++ b/test/storage/file.js @@ -43,10 +43,20 @@ function FakeDuplexify() { } nodeutil.inherits(FakeDuplexify, duplexify); +var makeWritableStream_Override; +var fakeUtil = extend({}, util, { + makeWritableStream: function() { + var args = [].slice.call(arguments); + (makeWritableStream_Override || util.makeWritableStream).apply(null, args); + makeWritableStream_Override = null; + } +}); + var File = require('sandboxed-module') .require('../../lib/storage/file.js', { requires: { - 'duplexify': FakeDuplexify + 'duplexify': FakeDuplexify, + '../common/util': fakeUtil } }); @@ -263,48 +273,45 @@ describe('File', function() { describe('createWriteStream', function() { it('should get a writable stream', function(done) { - file.getWritableStream_ = function() { + makeWritableStream_Override = function() { done(); }; - file.createWriteStream(); + file.createWriteStream().emit('writing'); }); - it('should emit an error if one is returned', function(done) { - var error = new Error('Error.'); - file.getWritableStream_ = function(metadata, callback) { - setImmediate(function() { - callback(error); - }); - }; - file.createWriteStream() - .once('error', function(err) { - assert.equal(err, error); - done(); - }); - }); + it('should emit complete event when writing is complete', function(done) { + var fakeResponse = { fake: 'data' }; - it('should set writable stream from the response', function() { - var dup = duplexify(); - file.getWritableStream_ = function(metadata, callback) { - callback(null, dup); + makeWritableStream_Override = function(stream, options, callback) { + callback(fakeResponse); }; - file.createWriteStream(); - assert.deepEqual(writableStream, dup); - writableStream = null; - }); - it('should emit complete event when writable is complete', function(done) { - var dup = duplexify(); - var fakeResponse = { body: { fake: 'data' } }; - file.getWritableStream_ = function(metadata, callback) { - callback(null, dup); - }; file.createWriteStream() .on('complete', function(data) { - assert.deepEqual(data, fakeResponse.body); + assert.deepEqual(data, fakeResponse); + assert.deepEqual(file.metadata, data); done(); + }) + .emit('writing'); + }); + + it('should pass the required arguments', function(done) { + var metadata = { a: 'b', c: 'd' }; + + makeWritableStream_Override = function(stream, options) { + assert.deepEqual(options.connection, file.bucket.connection_); + assert.deepEqual(options.metadata, metadata); + assert.deepEqual(options.request, { + qs: { + name: file.name + }, + uri: 'https://www.googleapis.com/upload/storage/v1/b/' + + file.bucket.name + '/o' }); - dup.emit('complete', fakeResponse); + done(); + }; + + file.createWriteStream(metadata).emit('writing'); }); }); @@ -440,92 +447,4 @@ describe('File', function() { }); }); }); - - describe('getWritableStream_', function() { - var query; - var written; - var fakeStream = { - write: function(chunk) { - written.push(chunk); - } - }; - - beforeEach(function() { - query = {}; - written = []; - file.bucket.connection_.requester = function() { - return fakeStream; - }; - file.bucket.connection_.createAuthorizedReq = function(q, callback) { - query = q; - callback(null, fakeStream); - }; - }); - - function find(term) { - return written.some(function(chunk) { - return chunk.indexOf(term) > -1; - }); - } - - it('should set correct method', function() { - file.getWritableStream_(util.noop); - assert.equal(query.method, 'POST'); - }); - - it('should send to correct uri', function() { - file.getWritableStream_(util.noop); - var expected = - 'https://www.googleapis.com/upload/storage/v1/b/' + file.bucket.name + - '/o'; - assert.equal(query.uri, expected); - }); - - it('should set name on query string', function() { - file.getWritableStream_(util.noop); - assert.equal(query.qs.name, file.name); - }); - - it('should set multipart upload type', function() { - file.getWritableStream_(util.noop); - assert.equal(query.qs.uploadType, 'multipart'); - }); - - it('should set multipart content type in header', function() { - file.getWritableStream_(util.noop); - assert(query.headers['Content-Type'].indexOf('multipart/related;') === 0); - }); - - it('should set default contentType', function() { - file.getWritableStream_(util.noop); - assert(find('Content-Type: text/plain')); - }); - - it('should allow overwriting metadata', function() { - file.getWritableStream_({ - contentType: 'something/custom' - }, util.noop); - assert(find('Content-Type: something/custom')); - }); - - it('should send metadata from the instance', function() { - var metadata = { - hi: 'there', - good: 'sir', - metadata: { - properties: 'have values, often times' - } - }; - file.getWritableStream_(metadata, util.noop); - var expectedMetadata = extend({ contentType: 'text/plain' }, metadata); - assert(find(JSON.stringify(expectedMetadata))); - }); - - it('should execute callback with remote stream', function(done) { - file.getWritableStream_(function(err, stream) { - assert.equal(stream, fakeStream); - done(); - }); - }); - }); });