Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tune upload perf, better cleanup logic #2532

Merged
merged 2 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions catalog/app/containers/Bucket/PackageDialog/FilesInput.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import cx from 'classnames'
import pLimit from 'p-limit'
import * as R from 'ramda'
import * as React from 'react'
import { useDropzone, FileWithPath } from 'react-dropzone'
Expand Down Expand Up @@ -28,31 +29,36 @@ const COLORS = {

interface FileWithHash extends File {
hash: {
value: string | undefined
ready: boolean
promise: Promise<string>
value?: string
error?: Error
promise: Promise<string | undefined>
}
}

const hasHash = (f: File): f is FileWithHash => !!f && !!(f as FileWithHash).hash

// XXX: it might make sense to limit concurrency, tho the tests show perf is ok, since hashing is async anyways
const hashLimit = pLimit(2)

function computeHash(f: File) {
if (hasHash(f)) return f
const promise = PD.hashFile(f)
const hashP = hashLimit(PD.hashFile, f)
const fh = f as FileWithHash
fh.hash = { value: undefined, ready: false, promise }
promise
fh.hash = { ready: false } as any
fh.hash.promise = hashP
.catch((e) => {
// eslint-disable-next-line no-console
console.log('Error hashing file:')
console.log(`Error hashing file "${fh.name}":`)
// eslint-disable-next-line no-console
console.error(e)
fh.hash.error = e
fh.hash.ready = true
return undefined
})
.then((hash) => {
fh.hash.value = hash
fh.hash.ready = true
return hash
})
return fh
}
Expand Down
4 changes: 2 additions & 2 deletions catalog/app/containers/Bucket/PackageDialog/PackageDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ export const getNormalizedPath = (f: { path?: string; name: string }) => {
}

export async function hashFile(file: File) {
if (!window.crypto || !window.crypto.subtle || !window.crypto.subtle.digest)
throw new Error('Crypto API unavailable')
if (!window.crypto?.subtle?.digest) throw new Error('Crypto API unavailable')
const buf = await file.arrayBuffer()
// XXX: consider using hashwasm for stream-based hashing to support larger files
const hashBuf = await window.crypto.subtle.digest('SHA-256', buf)
return Array.from(new Uint8Array(hashBuf))
.map((b) => b.toString(16).padStart(2, '0'))
Expand Down
107 changes: 59 additions & 48 deletions catalog/app/containers/Bucket/PackageDialog/Uploads.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { S3 } from 'aws-sdk'
import * as FP from 'fp-ts'
import invariant from 'invariant'
import pLimit from 'p-limit'
import * as R from 'ramda'
import * as React from 'react'
Expand All @@ -12,7 +11,7 @@ import useMemoEq from 'utils/useMemoEq'

import type { LocalFile, ExistingFile } from './FilesInput'

export interface UploadResult extends S3.ManagedUpload.SendData {
interface UploadResult extends S3.ManagedUpload.SendData {
VersionId: string
}

Expand All @@ -25,8 +24,7 @@ export interface UploadTotalProgress {
export interface UploadsState {
[path: string]: {
file: File
upload: S3.ManagedUpload
promise: Promise<UploadResult>
promise: Promise<S3.ManagedUpload.SendData>
progress?: { total: number; loaded: number }
}
}
Expand Down Expand Up @@ -78,41 +76,52 @@ export function useUploads() {
}) => {
const limit = pLimit(2)
let rejected = false
const uploadStates = files.map(({ path, file }) => {
// reuse state if file hasnt changed
const entry = uploads[path]
if (entry && entry.file === file) return { ...entry, path }
const pendingUploads: Record<string, S3.ManagedUpload> = {}

const uploadFile = async (path: string, file: LocalFile) => {
if (rejected) {
remove(path)
return undefined as never
}

const upload: S3.ManagedUpload = s3.upload({
Bucket: bucket,
Key: `${prefix}/${path}`,
Body: file,
})

const upload: S3.ManagedUpload = s3.upload(
{
Bucket: bucket,
Key: `${prefix}/${path}`,
Body: file,
},
{
queueSize: 2,
},
)
upload.on('httpUploadProgress', ({ loaded }) => {
if (rejected) return
setUploads(R.assocPath([path, 'progress', 'loaded'], loaded))
})
const promise = limit(async () => {
if (rejected) {
remove(path)
return undefined
}
try {
const uploadP = upload.promise()
await file.hash.promise
return await uploadP
} catch (e) {

pendingUploads[path] = upload

try {
const uploadP = upload.promise()
await file.hash.promise
return await uploadP
} catch (e) {
if ((e as any).code !== 'RequestAbortedError') {
// eslint-disable-next-line no-console
console.log(`Error uploading file "${file.name}"`)
rejected = true
remove(path)
throw e
Object.values(pendingUploads).forEach((u) => u.abort())
}
}) as Promise<UploadResult>
return { path, file, upload, promise, progress: { total: file.size, loaded: 0 } }
remove(path)
throw e
} finally {
delete pendingUploads[path]
}
}

const uploadStates = files.map(({ path, file }) => {
// reuse state if file hasnt changed
const entry = uploads[path]
if (entry && entry.file === file) return { ...entry, path }

const promise = limit(uploadFile, path, file)
return { path, file, promise, progress: { total: file.size, loaded: 0 } }
})

FP.function.pipe(
Expand All @@ -125,22 +134,24 @@ export function useUploads() {
const uploaded = await Promise.all(uploadStates.map((x) => x.promise))

return FP.function.pipe(
FP.array.zipWith(files, uploaded, (f, r) => {
invariant(f.file.hash.value, 'File must have a hash')
return [
f.path,
{
physicalKey: s3paths.handleToS3Url({
bucket,
key: r.Key,
version: r.VersionId,
}),
size: f.file.size,
hash: f.file.hash.value,
meta: getMeta?.(f.path),
},
] as R.KeyValuePair<string, ExistingFile>
}),
FP.array.zipWith(
files,
uploaded,
(f, r) =>
[
f.path,
{
physicalKey: s3paths.handleToS3Url({
bucket,
key: r.Key,
version: (r as UploadResult).VersionId,
}),
size: f.file.size,
hash: f.file.hash.value,
meta: getMeta?.(f.path),
},
] as R.KeyValuePair<string, ExistingFile>,
),
R.fromPairs,
)
},
Expand Down
2 changes: 2 additions & 0 deletions catalog/app/utils/ResourceCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ function* handleInit({ resource, input, resolver }) {
}

function* cleanup() {
// TODO: refactor cleanup logic, so that the cleanup action is only dispatched
// when there's anything to cleanup (to avoid re-renders every 5 sec)
while (true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😬

yield effects.delay(RELEASE_TIME)
yield effects.put(Action.CleanUp({ time: new Date() }))
Expand Down
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
## CLI

## Catalog, Lambdas
* [Fixed] Improve upload performance and stability, fix some hashing-related errors ([#2532](https://github.com/quiltdata/quilt/pull/2532))
* [Added] Echarts renderer ([#2382](https://github.com/quiltdata/quilt/pull/2382))
* [Added] Set height for `quilt_summarize.json` files ([#2474](https://github.com/quiltdata/quilt/pull/2474))
* [Added] Add a "transcode" lambda for previewing video files ([#2366](https://github.com/quiltdata/quilt/pull/2366/))
Expand Down