From b57dc8a9e47a79eaaefb083cf176e4ff365837a0 Mon Sep 17 00:00:00 2001 From: develar Date: Sat, 3 Jun 2017 10:38:00 +0200 Subject: [PATCH] fix(deployment): s3 publisher md5 integrity --- package.json | 2 +- packages/electron-publisher-s3/package.json | 2 +- .../electron-publisher-s3/src/fdSlicer.ts | 71 ------ .../src/multipartEtag.ts | 73 ------ .../electron-publisher-s3/src/s3Publisher.ts | 2 +- .../electron-publisher-s3/src/uploader.ts | 224 ++++++------------ yarn.lock | 14 +- 7 files changed, 80 insertions(+), 308 deletions(-) delete mode 100644 packages/electron-publisher-s3/src/fdSlicer.ts delete mode 100644 packages/electron-publisher-s3/src/multipartEtag.ts diff --git a/package.json b/package.json index 3427f06f5d3..164fcb464cc 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "ajv": "^5.1.5", "ajv-keywords": "^2.1.0", "archiver": "^1.3.0", - "aws-sdk": "^2.61.0", + "aws-sdk": "^2.62.0", "bluebird-lst": "^1.0.2", "chalk": "^1.1.3", "chromium-pickle-js": "^0.2.0", diff --git a/packages/electron-publisher-s3/package.json b/packages/electron-publisher-s3/package.json index 8818fda308d..33e927f63fb 100644 --- a/packages/electron-publisher-s3/package.json +++ b/packages/electron-publisher-s3/package.json @@ -12,7 +12,7 @@ ], "dependencies": { "fs-extra-p": "^4.3.0", - "aws-sdk": "^2.61.0", + "aws-sdk": "^2.62.0", "mime": "^1.3.6", "electron-publish": "~0.0.0-semantic-release", "electron-builder-util": "~0.0.0-semantic-release" diff --git a/packages/electron-publisher-s3/src/fdSlicer.ts b/packages/electron-publisher-s3/src/fdSlicer.ts deleted file mode 100644 index 974355c75dd..00000000000 --- a/packages/electron-publisher-s3/src/fdSlicer.ts +++ /dev/null @@ -1,71 +0,0 @@ -import BluebirdPromise from "bluebird-lst" -import { EventEmitter } from "events" -import * as fs from "fs" -import { close } from "fs-extra-p" -import { Readable } from "stream" - -export class FdSlicer extends EventEmitter { - constructor(public fd: any) { - super() - } - - read(buffer: any, offset: any, length: number, position: number, callback: any) { - fs.read(this.fd, buffer, offset, length, position, callback) - } - - close() { - const fd = this.fd - return fd == null ? BluebirdPromise.resolve() : close(fd) - } -} - -export class SlicedReadStream extends Readable { - destroyed = false - - constructor(private readonly context: FdSlicer, private pos: number, private readonly end: number = -1) { - super() - - this.destroyed = false - } - - _read(n: number) { - if (this.destroyed) { - return - } - - const toRead = Math.min((this)._readableState.highWaterMark, n, this.end === -1 ? 0 : this.end - this.pos) - if (toRead <= 0) { - this.destroyed = true - this.push(null) - return - } - - if (this.destroyed) { - return - } - - const buffer = new Buffer(toRead) - fs.read(this.context.fd, buffer, 0, toRead, this.pos, (error: Error, bytesRead: number) => { - if (error != null) { - this.destroy(error) - } - else if (bytesRead === 0) { - this.destroyed = true - this.push(null) - } - else { - this.pos += bytesRead - this.push(buffer.slice(0, bytesRead)) - } - }) - } - - private destroy(error: Error) { - if (this.destroyed) { - return - } - - this.destroyed = true - this.emit("error", error) - } -} \ No newline at end of file diff --git a/packages/electron-publisher-s3/src/multipartEtag.ts b/packages/electron-publisher-s3/src/multipartEtag.ts deleted file mode 100644 index df382988d18..00000000000 --- a/packages/electron-publisher-s3/src/multipartEtag.ts +++ /dev/null @@ -1,73 +0,0 @@ -import { createHash } from "crypto" -import { Transform } from "stream" - -/* For objects uploaded via a single request to S3, the ETag is simply the hex - * string of the MD5 digest. However for multipart uploads, the ETag is more - * complicated. It is the MD5 digest of each part concatenated, and then the - * MD5 digest of *that*, followed by '-', followed by the part count. - * - * Sadly, this means there is no way to be sure whether a local file matches a - * remote object. The best we can do is hope that the software used to upload - * to S3 used a fixed part size, and that it was one of a few common sizes. - */ - -const maximumUploadSize = 5 * 1024 * 1024 * 1024 - -export class MultipartETag extends Transform { - private sum: any = { - size: maximumUploadSize, - hash: createHash("md5"), - amtWritten: 0, - digests: [], - eTag: null, - } - - bytes = 0 - private digest: Buffer | null = null - - constructor() { - super() - } - - _transform(chunk: any, encoding: string, callback: Function) { - this.bytes += chunk.length - const sumObj = this.sum - const newAmtWritten = sumObj.amtWritten + chunk.length - if (newAmtWritten <= sumObj.size) { - sumObj.amtWritten = newAmtWritten - sumObj.hash.update(chunk, encoding) - } - else { - const finalBytes = sumObj.size - sumObj.amtWritten - sumObj.hash.update(chunk.slice(0, finalBytes), encoding) - sumObj.digests.push(sumObj.hash.digest()) - sumObj.hash = createHash("md5") - sumObj.hash.update(chunk.slice(finalBytes), encoding) - sumObj.amtWritten = chunk.length - finalBytes - } - this.emit("progress") - callback(null, chunk) - } - - _flush(callback: any) { - const sumObj = this.sum - const digest = sumObj.hash.digest() - sumObj.digests.push(digest) - const finalHash = createHash("md5") - for (const digest of sumObj.digests) { - finalHash.update(digest) - } - sumObj.eTag = `${finalHash.digest("hex")}-${sumObj.digests.length}` - if (sumObj.digests.length === 1) { - this.digest = digest - } - callback(null) - } - - anyMatch(eTag: string) { - if (this.digest != null && this.digest.toString("hex") === eTag) { - return true - } - return this.sum.eTag === eTag - } -} \ No newline at end of file diff --git a/packages/electron-publisher-s3/src/s3Publisher.ts b/packages/electron-publisher-s3/src/s3Publisher.ts index 39d93ad9231..7ef0b73a7b3 100644 --- a/packages/electron-publisher-s3/src/s3Publisher.ts +++ b/packages/electron-publisher-s3/src/s3Publisher.ts @@ -51,7 +51,7 @@ export default class S3Publisher extends Publisher { const callback = new ProgressCallback(progressBar) uploader.on("progress", () => { if (!cancellationToken.cancelled) { - callback.update(uploader.progressAmount, uploader.progressTotal) + callback.update(uploader.loaded, uploader.contentLength) } }) } diff --git a/packages/electron-publisher-s3/src/uploader.ts b/packages/electron-publisher-s3/src/uploader.ts index 31f66350f0d..41792c729a8 100644 --- a/packages/electron-publisher-s3/src/uploader.ts +++ b/packages/electron-publisher-s3/src/uploader.ts @@ -2,10 +2,9 @@ import { config as awsConfig, S3 } from "aws-sdk" import BluebirdPromise from "bluebird-lst" import { createHash } from "crypto" import { EventEmitter } from "events" -import { createReadStream, open, stat } from "fs-extra-p" +import { createReadStream, stat } from "fs-extra-p" import mime from "mime" -import { FdSlicer, SlicedReadStream } from "./fdSlicer" -import { MultipartETag } from "./multipartEtag" +import { cpus } from "os" const MAX_PUT_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 const MAX_MULTIPART_COUNT = 10000 @@ -54,110 +53,56 @@ export class S3Client { export class Uploader extends EventEmitter { /** @readonly */ - progressAmount = 0 - /** @readonly */ - progressTotal = 0 + loaded = 0 private cancelled = false - private fileSlicer: FdSlicer | null - - private readonly parts: Array = [] - - private slicerError: Error | null - private contentLength: number + /** @readonly */ + contentLength: number constructor(private readonly client: S3Client, private readonly s3Options: any, private readonly localFile: string) { super() } - private async openFile() { - this.progressTotal = this.contentLength - this.fileSlicer = new FdSlicer(await open(this.localFile, "r")) - this.fileSlicer.on("error", (error: Error) => { - this.cancelled = true - this.slicerError = error - }) - } - - private async closeFile() { - this.cancelled = true - if (this.fileSlicer != null) { - this.fileSlicer.close() - this.fileSlicer = null - } - } - - upload() { - return (>this._upload()) - .then(async it => { - if (this.slicerError != null) { - throw this.slicerError - } - await this.closeFile() - return it - }) - .catch(async error => { - try { - await this.closeFile() - } - catch (ignored) { - } - throw error - }) - } - - private async _upload() { - const client = this.client - + async upload() { this.contentLength = (await stat(this.localFile)).size - this.progressTotal = this.contentLength - - if (this.contentLength >= client.multipartUploadThreshold) { - await this.openFile() - let multipartUploadSize = client.multipartUploadSize - const partsRequiredCount = Math.ceil(this.contentLength / multipartUploadSize) - if (partsRequiredCount > MAX_MULTIPART_COUNT) { - multipartUploadSize = smallestPartSizeFromFileSize(this.contentLength) - } - if (multipartUploadSize > MAX_PUT_OBJECT_SIZE) { - throw new Error(`File size exceeds maximum object size: ${this.localFile}`) - } + const client = this.client + if (this.contentLength < client.multipartUploadThreshold) { + const md5 = await hashFile(this.localFile, "md5", "base64") + await this.runOrRetry(this.putObject.bind(this, md5)) + return + } - const data = await this.runOrRetry(() => client.s3.createMultipartUpload(Object.assign({ContentType: mime.lookup(this.localFile)}, this.s3Options)).promise()) - await this.multipartUpload(data.UploadId!, multipartUploadSize) + let multipartUploadSize = client.multipartUploadSize + if (Math.ceil(this.contentLength / multipartUploadSize) > MAX_MULTIPART_COUNT) { + multipartUploadSize = smallestPartSizeFromFileSize(this.contentLength) } - else { - await this.runOrRetry(this.putObject.bind(this)) + + if (multipartUploadSize > MAX_PUT_OBJECT_SIZE) { + throw new Error(`File size exceeds maximum object size: ${this.localFile}`) } + + const data = await this.runOrRetry(() => client.s3.createMultipartUpload(Object.assign({ContentType: mime.lookup(this.localFile)}, this.s3Options)).promise()) + await this.multipartUpload(data.UploadId!, multipartUploadSize) } abort() { this.cancelled = true } - private async putObject() { - this.progressAmount = 0 - - const md5 = await hashFile(this.localFile, "md5", "base64") - - await new BluebirdPromise((resolve, reject) => { - const inStream = createReadStream(this.localFile) - inStream.on("error", reject) - + private putObject(md5: string) { + this.loaded = 0 + return new BluebirdPromise((resolve, reject) => { this.client.s3.putObject(Object.assign({ ContentType: mime.lookup(this.localFile), ContentLength: this.contentLength, - Body: inStream, + Body: createReadStream(this.localFile), ContentMD5: md5, }, this.s3Options)) .on("httpUploadProgress", progress => { - this.progressAmount = progress.loaded - this.progressTotal = progress.total - if (!this.cancelled) { - this.emit("progress") - } + this.loaded = progress.loaded + this.emit("progress") }) .send((error, data) => { if (error == null) { @@ -174,6 +119,8 @@ export class Uploader extends EventEmitter { let cursor = 0 let nextPartNumber = 1 + const partsA: Array = [] + const parts: Array = [] while (cursor < this.contentLength) { const start = cursor @@ -183,84 +130,60 @@ export class Uploader extends EventEmitter { } cursor = end const part = { - ETag: null, PartNumber: nextPartNumber++, } - this.parts.push(part) - parts.push({start, end, part}) + partsA.push(part) + parts.push({start, end, part, md5: ""}) } + await BluebirdPromise.map(parts, async it => { + // hashFile - both start and end are inclusive + it.md5 = await hashFile(this.localFile, "md5", "base64", {start: it.start, end: it.end - 1}) + }, {concurrency: cpus().length}) + await BluebirdPromise.map(parts, it => this.makeUploadPart(it, uploadId), {concurrency: 4}) return await this.runOrRetry(() => this.client.s3.completeMultipartUpload({ Bucket: this.s3Options.Bucket, Key: this.s3Options.Key, UploadId: uploadId, MultipartUpload: { - Parts: this.parts, + Parts: partsA, }, }).promise() ) } - private makeUploadPart(p: Part, uploadId: string): Promise { - return this.runOrRetry(async () => { - const client = this.client - let errorOccurred = false - - const contentLength = p.end - p.start - - const multipartETag = new MultipartETag() - let prevBytes = 0 - let overallDelta = 0 - multipartETag.on("progress", () => { - if (this.cancelled || errorOccurred) { - return - } - - const delta = multipartETag.bytes - prevBytes - prevBytes = multipartETag.bytes - this.progressAmount += delta - overallDelta += delta - this.emit("progress") - }) - - const inStream = new SlicedReadStream(this.fileSlicer!, p.start, p.end) - const multipartPromise = new BluebirdPromise((resolve, reject) => { - inStream.on("error", reject) - multipartETag.on("end", resolve) - }) - .then(() => { - if (this.cancelled || errorOccurred) { - return - } - - this.progressAmount += multipartETag.bytes - prevBytes - this.progressTotal += (p.end - p.start) - multipartETag.bytes - this.emit("progress") + private makeUploadPart(part: Part, uploadId: string): Promise { + const contentLength = part.end - part.start + const client = this.client + return this.runOrRetry(() => { + let partLoaded = 0 + return new BluebirdPromise((resolve, reject) => { + client.s3.uploadPart({ + ContentLength: contentLength, + PartNumber: part.part.PartNumber, + UploadId: uploadId, + Body: createReadStream(this.localFile, {start: part.start, end: part.end - 1}), + Bucket: this.s3Options.Bucket, + Key: this.s3Options.Key, + ContentMD5: part.md5, }) - - inStream.pipe(multipartETag) - - const data = (await BluebirdPromise.all([multipartPromise, client.s3.uploadPart({ - ContentLength: contentLength, - PartNumber: p.part.PartNumber, - UploadId: uploadId, - Body: multipartETag, - Bucket: this.s3Options.Bucket, - Key: this.s3Options.Key, - }).promise() - .catch(error => { - errorOccurred = true - this.progressAmount -= overallDelta - throw error - })]))[1] - - if (!compareMultipartETag(data.ETag, multipartETag)) { - this.progressAmount -= overallDelta - throw new Error("ETag does not match MD5 checksum") - } - p.part.ETag = data.ETag - return data + .on("httpUploadProgress", progress => { + partLoaded = progress.loaded + this.loaded += progress.loaded + this.emit("progress") + }) + .send((error, data) => { + if (error == null) { + part.part.ETag = data.ETag + resolve(data) + } + else { + this.loaded -= partLoaded + reject(error) + } + }) + }) }) } @@ -296,14 +219,7 @@ interface Part { start: number end: number part: any -} - -function cleanETag(eTag: string | null | undefined) { - return eTag == null ? "" : eTag.replace(/^\s*'?\s*"?\s*(.*?)\s*"?\s*'?\s*$/, "$1") -} - -function compareMultipartETag(eTag: string | null | undefined, multipartETag: any) { - return multipartETag.anyMatch(cleanETag(eTag)) + md5: string } function smallestPartSizeFromFileSize(fileSize: number) { @@ -311,14 +227,14 @@ function smallestPartSizeFromFileSize(fileSize: number) { return partSize < MIN_MULTIPART_SIZE ? MIN_MULTIPART_SIZE : partSize } -function hashFile(file: string, algorithm: string, encoding: string = "hex") { +function hashFile(file: string, algorithm: string, encoding: string = "hex", options: any = undefined) { return new BluebirdPromise((resolve, reject) => { const hash = createHash(algorithm) hash .on("error", reject) .setEncoding(encoding) - createReadStream(file) + createReadStream(file, options) .on("error", reject) .on("end", () => { hash.end() diff --git a/yarn.lock b/yarn.lock index 99942141431..62c0b827ea0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -39,8 +39,8 @@ resolved "https://registry.yarnpkg.com/@types/node-forge/-/node-forge-0.6.9.tgz#38e0e00de8e6f3ff0bb1f4cddcad43e7b580733a" "@types/node@*": - version "7.0.26" - resolved "https://registry.yarnpkg.com/@types/node/-/node-7.0.26.tgz#e34bf70eb578d3bd962e081f4996c82b7194dddc" + version "7.0.27" + resolved "https://registry.yarnpkg.com/@types/node/-/node-7.0.27.tgz#ba5e1a87aca2b4f5817289615ffe56472927687e" "@types/source-map-support@^0.4.0": version "0.4.0" @@ -265,9 +265,9 @@ asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" -aws-sdk@^2.61.0: - version "2.61.0" - resolved "https://registry.yarnpkg.com/aws-sdk/-/aws-sdk-2.61.0.tgz#2c104788e1696e7060c6ffaa1c6560d6f9c884af" +aws-sdk@^2.62.0: + version "2.62.0" + resolved "https://registry.yarnpkg.com/aws-sdk/-/aws-sdk-2.62.0.tgz#98508d3f93aaf3f74f18ca3b8cfe3a11bdad1a5f" dependencies: buffer "5.0.6" crypto-browserify "1.0.9" @@ -2909,8 +2909,8 @@ rimraf@^2.6.1: glob "^7.0.5" safe-buffer@^5.0.1: - version "5.0.1" - resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.0.1.tgz#d263ca54696cd8a306b5ca6551e92de57918fbe7" + version "5.1.0" + resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.0.tgz#fe4c8460397f9eaaaa58e73be46273408a45e223" sane@~1.6.0: version "1.6.0"