Skip to content

Commit

Permalink
storage: clean write streams
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Aug 26, 2015
1 parent f306dcf commit cd9f7ba
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 126 deletions.
2 changes: 1 addition & 1 deletion lib/storage/bucket.js
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ Bucket.prototype.upload = function(localPath, options, callback) {
.on('error', function(err) {
callback(err);
})
.on('complete', function() {
.on('finish', function() {
callback(null, newFile);
});
}
Expand Down
151 changes: 75 additions & 76 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var fs = require('fs');
var hashStreamValidation = require('hash-stream-validation');
var is = require('is');
var once = require('once');
var pumpify = require('pumpify');
var request = require('request').defaults({
pool: {
maxSockets: Infinity
Expand Down Expand Up @@ -396,7 +397,7 @@ File.prototype.move = function(destination, callback) {
* .on('response', function(response) {
* // Server connected and responded with the specified status and headers.
* })
* .on('complete', function() {
* .on('end', function() {
* // The file is fully downloaded.
* })
* .pipe(fs.createWriteStream(localFilename));
Expand Down Expand Up @@ -608,7 +609,7 @@ File.prototype.createReadStream = function(options) {
* fs.createReadStream('/Users/stephen/Photos/birthday-at-the-zoo/panda.jpg')
* .pipe(image.createWriteStream())
* .on('error', function(err) {})
* .on('complete', function(metadata) {
* .on('finish', function() {
* // The file upload is complete.
* });
*
Expand All @@ -621,7 +622,7 @@ File.prototype.createReadStream = function(options) {
* fs.createReadStream('/Users/stephen/site/index.html')
* .pipe(htmlFile.createWriteStream({ gzip: true }))
* .on('error', function(err) {})
* .on('complete', function(metadata) {
* .on('finish', function() {
* // The file upload is complete.
* });
*
Expand Down Expand Up @@ -651,14 +652,14 @@ File.prototype.createReadStream = function(options) {
* }
* }))
* .on('error', function(err) {})
* .on('complete', function(metadata) {
* .on('finish', function() {
* // The file upload is complete.
* });
*/
File.prototype.createWriteStream = function(options) {
options = options || {};

var that = this;
var self = this;

var gzip = options.gzip;

Expand All @@ -667,104 +668,102 @@ File.prototype.createWriteStream = function(options) {
metadata.contentEncoding = 'gzip';
}

var crc32c = false;
var md5 = false;
var crc32c = options.validation !== false;
var md5 = options.validation !== false;

if (is.string(options.validation)) {
options.validation = options.validation.toLowerCase();
crc32c = options.validation === 'crc32c';
md5 = options.validation === 'md5';
}

if (is.undef(options.validation)) {
crc32c = true;
md5 = true;
}

// Collect data as it comes in to store in a hash. This is compared to the
// checksum value on the returned metadata from the API.
var validateStream = hashStreamValidation({
crc32c: crc32c,
md5: md5
});

var writableStream = streamEvents(duplexify());

var throughStream = through();
var fileWriteStream = duplexify();

throughStream
var stream = streamEvents(pumpify([
gzip ? zlib.createGzip() : through(),
validateStream,
fileWriteStream
]));

.pipe(gzip ? zlib.createGzip() : through())

.pipe(validateStream)
// Wait until we've received data to determine what upload technique to use.
stream.on('writing', function() {
if (options.resumable === false) {
self.startSimpleUpload_(fileWriteStream, metadata);
} else {
self.startResumableUpload_(fileWriteStream, metadata);
}
});

.pipe(writableStream)
// This is to preserve the `finish` event. We wait until the request stream
// emits "complete", as that is when we do validation of the data. After that
// is successful, we can allow the stream to naturally finish.
//
// Reference for tracking when we can use a non-hack solution:
// https://github.com/nodejs/node/pull/2314
fileWriteStream.on('prefinish', function() {
stream.cork();
});

// Wait until we've received data to determine what upload technique to use.
.once('writing', function() {
if (options.resumable === false) {
that.startSimpleUpload_(writableStream, metadata);
} else {
that.startResumableUpload_(writableStream, metadata);
}
})
// Compare our hashed version vs the completed upload's version.
fileWriteStream.on('complete', function(metadata) {
var failed = true;

// Catch any errors from the writable stream and patch them upstream.
.on('error', function(err) {
throughStream.destroy(err);
})
if (crc32c && metadata.crc32c) {
// We must remove the first four bytes from the returned checksum.
// http://stackoverflow.com/questions/25096737/
// base64-encoding-of-crc32c-long-value
failed = !validateStream.test('crc32c', metadata.crc32c.substr(4));
}

// Compare our hashed version vs the completed upload's version.
.on('complete', function(metadata) {
var failed = false;
if (md5 && metadata.md5Hash) {
failed = !validateStream.test('md5', metadata.md5Hash);
}

if (crc32c && metadata.crc32c) {
// We must remove the first four bytes from the returned checksum.
// http://stackoverflow.com/questions/25096737/
// base64-encoding-of-crc32c-long-value
failed = !validateStream.test('crc32c', metadata.crc32c.substr(4));
}
if (failed) {
self.delete(function(err) {
var code;
var message;

if (md5 && metadata.md5Hash) {
failed = !validateStream.test('md5', metadata.md5Hash);
}
if (err) {
code = 'FILE_NO_UPLOAD_DELETE';
message = [
'The uploaded data did not match the data from the server. As a',
'precaution, we attempted to delete the file, but it was not',
'successful. To be sure the content is the same, you should try',
'removing the file manually, then uploading the file again.',
'\n\nThe delete attempt failed with this message:',
'\n\n ' + err.message
].join(' ');
} else {
code = 'FILE_NO_UPLOAD';
message = [
'The uploaded data did not match the data from the server. As a',
'precaution, the file has been deleted. To be sure the content',
'is the same, you should try uploading the file again.'
].join(' ');
}

if (failed) {
that.delete(function(err) {
var code;
var message;
var error = new Error(message);
error.code = code;
error.errors = [err];

if (err) {
code = 'FILE_NO_UPLOAD_DELETE';
message = [
'The uploaded data did not match the data from the server. As a',
'precaution, we attempted to delete the file, but it was not',
'successful. To be sure the content is the same, you should try',
'removing the file manually, then uploading the file again.',
'\n\nThe delete attempt failed with this message:',
'\n\n ' + err.message
].join(' ');
} else {
code = 'FILE_NO_UPLOAD';
message = [
'The uploaded data did not match the data from the server. As a',
'precaution, the file has been deleted. To be sure the content',
'is the same, you should try uploading the file again.'
].join(' ');
}
fileWriteStream.destroy(error);
});

var error = new Error(message);
error.code = code;
error.errors = [err];
return;
}

throughStream.destroy(error);
});
} else {
throughStream.emit('complete', metadata);
}
});
stream.uncork();
});

return throughStream;
return stream;
};

/**
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"prop-assign": "^1.0.0",
"propprop": "^0.3.0",
"protobufjs": "^3.8.2",
"pumpify": "^1.3.3",
"request": "^2.53.0",
"retry-request": "^1.2.1",
"split-array-stream": "^1.0.0",
Expand Down
22 changes: 10 additions & 12 deletions system-test/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ function deleteFile(file, callback) {
function writeToFile(file, contents, callback) {
var writeStream = file.createWriteStream();
writeStream.once('error', callback);
writeStream.once('complete', callback.bind(null, null));
writeStream.once('finish', callback.bind(null, null));
writeStream.end(contents);
}

Expand Down Expand Up @@ -462,7 +462,7 @@ describe('storage', function() {
writeStream.end();

writeStream.on('error', done);
writeStream.on('complete', function() {
writeStream.on('finish', function() {
var data = new Buffer('');

file.createReadStream()
Expand Down Expand Up @@ -583,8 +583,8 @@ describe('storage', function() {
fs.createReadStream(files.big.path)
.pipe(file.createWriteStream({ resumable: false }))
.on('error', done)
.on('complete', function(fileObject) {
assert.equal(fileObject.md5Hash, files.big.hash);
.on('finish', function() {
assert.equal(file.metadata.md5Hash, files.big.hash);
file.delete(done);
});
});
Expand Down Expand Up @@ -616,10 +616,10 @@ describe('storage', function() {
upload({ interrupt: true }, function(err) {
assert.ifError(err);

upload({ interrupt: false }, function(err, metadata) {
upload({ interrupt: false }, function(err) {
assert.ifError(err);

assert.equal(metadata.size, fileSize);
assert.equal(file.metadata.size, fileSize);
file.delete(done);
});
});
Expand All @@ -641,9 +641,7 @@ describe('storage', function() {
}))
.pipe(file.createWriteStream())
.on('error', callback)
.on('complete', function(fileObject) {
callback(null, fileObject);
});
.on('finish', callback);
}
});
});
Expand All @@ -661,7 +659,7 @@ describe('storage', function() {
writable.write(fileContent);
writable.end();

writable.on('complete', function() {
writable.on('finish', function() {
file.createReadStream()
.on('error', done)
.pipe(fs.createWriteStream(tmpFilePath))
Expand Down Expand Up @@ -738,7 +736,7 @@ describe('storage', function() {
fs.createReadStream(files.logo.path)
.pipe(file.createWriteStream())
.on('error', done)
.on('complete', function() {
.on('finish', function() {
file.copy(filenames[1], function(err, copiedFile) {
assert.ifError(err);
copiedFile.copy(filenames[2], done);
Expand Down Expand Up @@ -890,7 +888,7 @@ describe('storage', function() {
fs.createReadStream(files.logo.path)
.pipe(file.createWriteStream())
.on('error', done)
.on('complete', done.bind(null, null));
.on('finish', done.bind(null, null));
});

it('should create a signed read url', function(done) {
Expand Down
Loading

0 comments on commit cd9f7ba

Please sign in to comment.