Skip to content

Commit

Permalink
Companion throttle progress by time (#4101)
Browse files Browse the repository at this point in the history
* throttle progress events by time

instead of value
when uploading large files on fast connections, sometimes
a LOT of events will be emitted. (with distinct byte values)
this causes logs to flood and a lot of events to be sent to the client
slowing down everything

* fix tests

final progress is no longer guaranteed to be emitted
  • Loading branch information
mifi authored Sep 13, 2022
1 parent 7970cf5 commit 1b5c0d7
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 35 deletions.
1 change: 1 addition & 0 deletions packages/@uppy/companion/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"ipaddr.js": "^2.0.1",
"jsonwebtoken": "8.5.1",
"lodash.merge": "^4.6.2",
"lodash.throttle": "^4.1.1",
"mime-types": "2.1.35",
"moment": "^2.29.2",
"moment-timezone": "^0.5.31",
Expand Down
37 changes: 20 additions & 17 deletions packages/@uppy/companion/src/server/Uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const { join } = require('node:path')
const fs = require('node:fs')
const { promisify } = require('node:util')
const FormData = require('form-data')
const throttle = require('lodash.throttle')

// TODO move to `require('streams/promises').pipeline` when dropping support for Node.js 14.x.
const pipeline = promisify(pipelineCb)
Expand Down Expand Up @@ -165,7 +166,6 @@ class Uploader {

this.uploadStopped = false

this.emittedProgress = {}
this.storage = options.storage
this._paused = false

Expand Down Expand Up @@ -281,10 +281,14 @@ class Uploader {
} finally {
logger.debug('cleanup', this.shortToken)
if (this.readStream && !this.readStream.destroyed) this.readStream.destroy()
if (this.tmpPath) unlink(this.tmpPath).catch(() => {})
await this.tryDeleteTmpPath()
}
}

tryDeleteTmpPath () {
if (this.tmpPath) unlink(this.tmpPath).catch(() => {})
}

/**
*
* @param {import('stream').Readable} stream
Expand Down Expand Up @@ -408,6 +412,17 @@ class Uploader {
this.storage.set(redisKey, jsonStringify(state), 'EX', keyExpirySec)
}

throttledEmitProgress = throttle((dataToEmit) => {
const { bytesUploaded, bytesTotal, progress } = dataToEmit.payload
logger.debug(
`${bytesUploaded} ${bytesTotal} ${progress}%`,
'uploader.total.progress',
this.shortToken,
)
this.saveState(dataToEmit)
emitter().emit(this.token, dataToEmit)
}, 1000, { trailing: false })

/**
*
* @param {number} [bytesUploaded]
Expand All @@ -428,11 +443,6 @@ class Uploader {
if (bytesTotal > 0) percentage = Math.min(Math.max(0, ((combinedBytes / bytesTotal) * 100)), 100)

const formattedPercentage = percentage.toFixed(2)
logger.debug(
`${combinedBytes} ${bytesTotal} ${formattedPercentage}%`,
'uploader.total.progress',
this.shortToken,
)

if (this._paused || this.uploadStopped) {
return
Expand All @@ -443,17 +453,10 @@ class Uploader {
action: 'progress',
payload,
}
this.saveState(dataToEmit)

const isEqual = (p1, p2) => (p1.progress === p2.progress
&& p1.bytesUploaded === p2.bytesUploaded
&& p1.bytesTotal === p2.bytesTotal)

// avoid flooding the client with progress events.
if (!isEqual(this.emittedProgress, payload)) {
this.emittedProgress = payload
emitter().emit(this.token, dataToEmit)
}
// avoid flooding the client (and log) with progress events.
// flooding will cause reduced performance and possibly network issues
this.throttledEmitProgress(dataToEmit)
}

/**
Expand Down
36 changes: 18 additions & 18 deletions packages/@uppy/companion/test/__tests__/uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ describe('uploader with tus protocol', () => {
const uploadToken = uploader.token
expect(uploadToken).toBeTruthy()

let progressReceived = 0
let firstReceivedProgress

const onProgress = jest.fn()
const onUploadSuccess = jest.fn()
Expand All @@ -80,14 +80,14 @@ describe('uploader with tus protocol', () => {
// emulate socket connection
socketClient.connect(uploadToken)
socketClient.onProgress(uploadToken, (message) => {
progressReceived = message.payload.bytesUploaded
if (firstReceivedProgress == null) firstReceivedProgress = message.payload.bytesUploaded
onProgress(message)
})
socketClient.onUploadSuccess(uploadToken, onUploadSuccess)
await promise
await uploader.tryUploadStream(stream)

expect(progressReceived).toBe(fileContent.length)
expect(firstReceivedProgress).toBe(8)

expect(onProgress).toHaveBeenLastCalledWith(expect.objectContaining({
payload: expect.objectContaining({
Expand Down Expand Up @@ -118,6 +118,15 @@ describe('uploader with tus protocol', () => {
}

const uploader = new Uploader(opts)
uploader.tryDeleteTmpPath = async () => {
// validate that the tmp file has been downloaded and saved into the file path
// must do it before it gets deleted
const fileInfo = fs.statSync(uploader.tmpPath)
expect(fileInfo.isFile()).toBe(true)
expect(fileInfo.size).toBe(fileContent.length)

return uploader.tryDeleteTmpPath()
}
const uploadToken = uploader.token
expect(uploadToken).toBeTruthy()

Expand All @@ -134,27 +143,18 @@ describe('uploader with tus protocol', () => {
})
})

let progressReceived = 0
let firstReceivedProgress

// emulate socket connection
socketClient.connect(uploadToken)
socketClient.onProgress(uploadToken, (message) => {
// validate that the file has been downloaded and saved into the file path
try {
progressReceived = message.payload.bytesUploaded

if (progressReceived === fileContent.length) {
const fileInfo = fs.statSync(uploader.tmpPath)
expect(fileInfo.isFile()).toBe(true)
expect(fileInfo.size).toBe(fileContent.length)
expect(message.payload.bytesTotal).toBe(fileContent.length)
}
} catch (err) {
reject(err)
}
if (firstReceivedProgress == null) firstReceivedProgress = message.payload.bytesUploaded
})
socketClient.onUploadSuccess(uploadToken, (message) => {
try {
expect(progressReceived).toBe(fileContent.length)
expect(message.payload.bytesTotal).toBe(fileContent.length)

expect(firstReceivedProgress).toBe(8192)
// see __mocks__/tus-js-client.js
expect(message.payload.url).toBe('https://tus.endpoint/files/foo-bar')
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8387,6 +8387,7 @@ __metadata:
jest: ^29.0.0
jsonwebtoken: 8.5.1
lodash.merge: ^4.6.2
lodash.throttle: ^4.1.1
mime-types: 2.1.35
moment: ^2.29.2
moment-timezone: ^0.5.31
Expand Down

0 comments on commit 1b5c0d7

Please sign in to comment.