diff --git a/lib/common/util.js b/lib/common/util.js index 819f12612589..e36d89975da3 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -147,6 +147,7 @@ function ApiError(errorBody) { this.errors = errorBody.errors; this.code = errorBody.code; this.message = errorBody.message; + this.response = errorBody.response; } util.inherits(ApiError, Error); @@ -180,7 +181,8 @@ function handleResp(err, resp, body, callback) { callback(new ApiError({ errors: [], code: resp.statusCode, - message: body || 'Error during request.' + message: body || 'Error during request.', + response: resp })); return; } diff --git a/lib/storage/file.js b/lib/storage/file.js index 8d727ce9a037..2342851f9a49 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -20,10 +20,13 @@ 'use strict'; +var bufferEqual = require('buffer-equal'); +var ConfigStore = require('configstore'); var crypto = require('crypto'); var duplexify = require('duplexify'); var request = require('request'); var streamEvents = require('stream-events'); +var through = require('through2'); /** * @type module:common/util @@ -264,28 +267,195 @@ File.prototype.createReadStream = function() { */ File.prototype.createWriteStream = function(metadata) { var that = this; + + var bufferStream = through(); + var configStore = new ConfigStore('gcloud-node'); var dup = streamEvents(duplexify()); + var makeAuthorizedRequest = that.bucket.storage.makeAuthorizedRequest_; + var request = require('request'); + var resumableUri; + var retries = 0; + + var RETRY_LIMIT = 3; + + metadata = metadata || {}; dup.once('writing', function() { - util.makeWritableStream(dup, { - makeAuthorizedRequest: that.bucket.storage.makeAuthorizedRequest_, - metadata: metadata, - request: { - qs: { - name: that.name - }, - uri: util.format('{base}/{bucket}/o', { - base: STORAGE_UPLOAD_BASE_URL, - bucket: that.bucket.name - }) + var config = configStore.get(that.name); + + if (config) { + resumeUpload(config.uri); + } else { + startUpload(); + } + }); + + function startUpload() { + var headers = {}; + + if (metadata.contentType) { + headers['X-Upload-Content-Type'] = metadata.contentType; + } + + makeAuthorizedRequest({ + method: 'POST', + uri: util.format('{base}/{bucket}/o', { + base: STORAGE_UPLOAD_BASE_URL, + bucket: that.bucket.name + }), + qs: { + name: that.name, + uploadType: 'resumable' + }, + headers: headers, + json: metadata + }, function(err, res, body) { + if (err) { + dup.emit('error', err); + dup.end(); + return; } - }, function(data) { - that.metadata = data; - dup.emit('complete', data); - dup.end(); + resumableUri = body.headers.location; + configStore.set(that.name, { + uri: resumableUri + }); + resumeUpload(resumableUri, -1); }); - }); + } + + function resumeUpload(uri, lastByteWritten) { + if (util.is(lastByteWritten, 'number')) { + prepareUpload(lastByteWritten); + } else { + getLastByteWritten(uri, prepareUpload); + } + + function prepareUpload(lastByteWritten) { + makeAuthorizedRequest({ + method: 'PUT', + uri: uri + }, { + onAuthorized: function (err, reqOpts) { + if (err) { + if (err.code === 404) { + startUpload(); + return; + } + + if (err.code > 499 && err.code < 600 && retries <= RETRY_LIMIT) { + retries++; + prepareUpload(lastByteWritten); + return; + } + + dup.emit('error', err); + dup.end(); + return; + } + + sendFile(reqOpts, lastByteWritten); + } + }); + } + } + + function sendFile(reqOpts, lastByteWritten) { + var startByte = lastByteWritten + 1; + reqOpts.headers['Content-Range'] = 'bytes ' + startByte + '-*/*'; + + var bytesWritten = 0; + var limitStream = through(function(chunk, enc, next) { + // Determine if this is the same content uploaded previously. + if (bytesWritten === 0) { + var cachedFirstChunk = configStore.get(that.name).firstChunk; + var firstChunk = chunk.slice(0, 16); + + if (!cachedFirstChunk) { + configStore.set(that.name, { + uri: reqOpts.uri, + firstChunk: firstChunk + }); + } else { + cachedFirstChunk = new Buffer(cachedFirstChunk); + firstChunk = new Buffer(firstChunk); + + if (!bufferEqual(cachedFirstChunk, firstChunk)) { + // Different content. Start a new upload. + bufferStream.unshift(chunk); + bufferStream.unpipe(this); + configStore.del(that.name); + startUpload(); + return; + } + } + } + + var length = chunk.length; + + if (util.is(chunk, 'string')) { + length = Buffer.byteLength(chunk.length, enc); + } + + if (bytesWritten < lastByteWritten) { + chunk = chunk.slice(bytesWritten - length); + } + + bytesWritten += length; + + if (bytesWritten >= lastByteWritten) { + this.push(chunk); + } + + next(); + }); + + bufferStream.pipe(limitStream).pipe(getStream(reqOpts)); + dup.setWritable(bufferStream); + + function getStream(reqOpts) { + var stream = request(reqOpts); + stream.callback = util.noop; + + stream.on('complete', function(res) { + util.handleResp(null, res, res.body, function(err, data) { + if (err) { + dup.emit('error', err); + dup.end(); + return; + } + + that.metadata = data; + dup.emit('complete', that.metadata); + + configStore.del(that.name); + }); + }); + + return stream; + } + } + + // If an upload to this file has previously started, this will return the last + // byte written to it. + function getLastByteWritten(uri, callback) { + makeAuthorizedRequest({ + method: 'PUT', + uri: uri, + headers: { + 'Content-Length': 0, + 'Content-Range': 'bytes */*' + } + }, function(err) { + if (err && err.code === 308) { + // headers.range format: ##-## (e.g. 0-4915200) + callback(parseInt(err.response.headers.range.split('-')[1])); + return; + } + + callback(-1); + }); + } return dup; }; diff --git a/package.json b/package.json index bde62b4199ab..40d67596055c 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ "google storage" ], "dependencies": { + "buffer-equal": "0.0.1", "duplexify": "^3.1.2", "extend": "^1.3.0", "google-service-account": "^1.0.0",