From c7a4447af0830c71fb0b5d31f30ab55478f88ab0 Mon Sep 17 00:00:00 2001 From: nl_0 Date: Mon, 15 Nov 2021 15:34:01 +0500 Subject: [PATCH 1/2] tune upload perf, better cleanup logic --- .../Bucket/PackageDialog/FilesInput.tsx | 20 ++-- .../Bucket/PackageDialog/PackageDialog.tsx | 4 +- .../Bucket/PackageDialog/Uploads.tsx | 107 ++++++++++-------- catalog/app/utils/ResourceCache.js | 2 + 4 files changed, 76 insertions(+), 57 deletions(-) diff --git a/catalog/app/containers/Bucket/PackageDialog/FilesInput.tsx b/catalog/app/containers/Bucket/PackageDialog/FilesInput.tsx index 65cbfdc672a..04362afd695 100644 --- a/catalog/app/containers/Bucket/PackageDialog/FilesInput.tsx +++ b/catalog/app/containers/Bucket/PackageDialog/FilesInput.tsx @@ -1,4 +1,5 @@ import cx from 'classnames' +import pLimit from 'p-limit' import * as R from 'ramda' import * as React from 'react' import { useDropzone, FileWithPath } from 'react-dropzone' @@ -28,31 +29,36 @@ const COLORS = { interface FileWithHash extends File { hash: { - value: string | undefined ready: boolean - promise: Promise + value?: string + error?: Error + promise: Promise } } const hasHash = (f: File): f is FileWithHash => !!f && !!(f as FileWithHash).hash -// XXX: it might make sense to limit concurrency, tho the tests show perf is ok, since hashing is async anyways +const hashLimit = pLimit(2) + function computeHash(f: File) { if (hasHash(f)) return f - const promise = PD.hashFile(f) + const hashP = hashLimit(PD.hashFile, f) const fh = f as FileWithHash - fh.hash = { value: undefined, ready: false, promise } - promise + fh.hash = { ready: false } as any + fh.hash.promise = hashP .catch((e) => { // eslint-disable-next-line no-console - console.log('Error hashing file:') + console.log(`Error hashing file "${fh.name}":`) // eslint-disable-next-line no-console console.error(e) + fh.hash.error = e + fh.hash.ready = true return undefined }) .then((hash) => { fh.hash.value = hash fh.hash.ready = true + return hash }) return fh } diff --git a/catalog/app/containers/Bucket/PackageDialog/PackageDialog.tsx b/catalog/app/containers/Bucket/PackageDialog/PackageDialog.tsx index 8c4bd946b25..9e172b62988 100644 --- a/catalog/app/containers/Bucket/PackageDialog/PackageDialog.tsx +++ b/catalog/app/containers/Bucket/PackageDialog/PackageDialog.tsx @@ -48,9 +48,9 @@ export const getNormalizedPath = (f: { path?: string; name: string }) => { } export async function hashFile(file: File) { - if (!window.crypto || !window.crypto.subtle || !window.crypto.subtle.digest) - throw new Error('Crypto API unavailable') + if (!window.crypto?.subtle?.digest) throw new Error('Crypto API unavailable') const buf = await file.arrayBuffer() + // XXX: consider using hashwasm for stream-based hashing to support larger files const hashBuf = await window.crypto.subtle.digest('SHA-256', buf) return Array.from(new Uint8Array(hashBuf)) .map((b) => b.toString(16).padStart(2, '0')) diff --git a/catalog/app/containers/Bucket/PackageDialog/Uploads.tsx b/catalog/app/containers/Bucket/PackageDialog/Uploads.tsx index 0d95a1570b2..1faa2961cf3 100644 --- a/catalog/app/containers/Bucket/PackageDialog/Uploads.tsx +++ b/catalog/app/containers/Bucket/PackageDialog/Uploads.tsx @@ -1,6 +1,5 @@ import type { S3 } from 'aws-sdk' import * as FP from 'fp-ts' -import invariant from 'invariant' import pLimit from 'p-limit' import * as R from 'ramda' import * as React from 'react' @@ -12,7 +11,7 @@ import useMemoEq from 'utils/useMemoEq' import type { LocalFile, ExistingFile } from './FilesInput' -export interface UploadResult extends S3.ManagedUpload.SendData { +interface UploadResult extends S3.ManagedUpload.SendData { VersionId: string } @@ -25,8 +24,7 @@ export interface UploadTotalProgress { export interface UploadsState { [path: string]: { file: File - upload: S3.ManagedUpload - promise: Promise + promise: Promise progress?: { total: number; loaded: number } } } @@ -78,41 +76,52 @@ export function useUploads() { }) => { const limit = pLimit(2) let rejected = false - const uploadStates = files.map(({ path, file }) => { - // reuse state if file hasnt changed - const entry = uploads[path] - if (entry && entry.file === file) return { ...entry, path } + const pendingUploads: Record = {} + + const uploadFile = async (path: string, file: LocalFile) => { + if (rejected) { + remove(path) + return undefined as never + } + + const upload: S3.ManagedUpload = s3.upload({ + Bucket: bucket, + Key: `${prefix}/${path}`, + Body: file, + }) - const upload: S3.ManagedUpload = s3.upload( - { - Bucket: bucket, - Key: `${prefix}/${path}`, - Body: file, - }, - { - queueSize: 2, - }, - ) upload.on('httpUploadProgress', ({ loaded }) => { if (rejected) return setUploads(R.assocPath([path, 'progress', 'loaded'], loaded)) }) - const promise = limit(async () => { - if (rejected) { - remove(path) - return undefined - } - try { - const uploadP = upload.promise() - await file.hash.promise - return await uploadP - } catch (e) { + + pendingUploads[path] = upload + + try { + const uploadP = upload.promise() + await file.hash.promise + return await uploadP + } catch (e) { + if ((e as any).code !== 'RequestAbortedError') { + // eslint-disable-next-line no-console + console.log(`Error uploading file "${file.name}"`) rejected = true - remove(path) - throw e + Object.values(pendingUploads).forEach((u) => u.abort()) } - }) as Promise - return { path, file, upload, promise, progress: { total: file.size, loaded: 0 } } + remove(path) + throw e + } finally { + delete pendingUploads[path] + } + } + + const uploadStates = files.map(({ path, file }) => { + // reuse state if file hasnt changed + const entry = uploads[path] + if (entry && entry.file === file) return { ...entry, path } + + const promise = limit(uploadFile, path, file) + return { path, file, promise, progress: { total: file.size, loaded: 0 } } }) FP.function.pipe( @@ -125,22 +134,24 @@ export function useUploads() { const uploaded = await Promise.all(uploadStates.map((x) => x.promise)) return FP.function.pipe( - FP.array.zipWith(files, uploaded, (f, r) => { - invariant(f.file.hash.value, 'File must have a hash') - return [ - f.path, - { - physicalKey: s3paths.handleToS3Url({ - bucket, - key: r.Key, - version: r.VersionId, - }), - size: f.file.size, - hash: f.file.hash.value, - meta: getMeta?.(f.path), - }, - ] as R.KeyValuePair - }), + FP.array.zipWith( + files, + uploaded, + (f, r) => + [ + f.path, + { + physicalKey: s3paths.handleToS3Url({ + bucket, + key: r.Key, + version: (r as UploadResult).VersionId, + }), + size: f.file.size, + hash: f.file.hash.value, + meta: getMeta?.(f.path), + }, + ] as R.KeyValuePair, + ), R.fromPairs, ) }, diff --git a/catalog/app/utils/ResourceCache.js b/catalog/app/utils/ResourceCache.js index 9f783261177..493c5f399ee 100644 --- a/catalog/app/utils/ResourceCache.js +++ b/catalog/app/utils/ResourceCache.js @@ -148,6 +148,8 @@ function* handleInit({ resource, input, resolver }) { } function* cleanup() { + // TODO: refactor cleanup logic, so that the cleanup action is only dispatched + // when there's anything to cleanup (to avoid re-renders every 5 sec) while (true) { yield effects.delay(RELEASE_TIME) yield effects.put(Action.CleanUp({ time: new Date() })) From 4fdf92a87f4a629a0827722c52cc5adb872606fc Mon Sep 17 00:00:00 2001 From: nl_0 Date: Wed, 17 Nov 2021 11:30:27 +0500 Subject: [PATCH 2/2] changelog entry --- docs/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3953cc985bd..dbd57181f1a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -14,6 +14,7 @@ ## CLI ## Catalog, Lambdas +* [Fixed] Improve upload performance and stability, fix some hashing-related errors ([#2532](https://github.com/quiltdata/quilt/pull/2532)) * [Added] Echarts renderer ([#2382](https://github.com/quiltdata/quilt/pull/2382)) * [Added] Set height for `quilt_summarize.json` files ([#2474](https://github.com/quiltdata/quilt/pull/2474)) * [Added] Add a "transcode" lambda for previewing video files ([#2366](https://github.com/quiltdata/quilt/pull/2366/))