From 503134292b379193116d6722412d73155601c759 Mon Sep 17 00:00:00 2001 From: seaerchin Date: Tue, 1 Jun 2021 20:11:36 +0800 Subject: [PATCH 1/7] feat(ndjsonstream): refactored ndjsonstream and removed old js version --- .../modules/forms/helpers/ndjsonStream.js | 70 ------------------- .../modules/forms/helpers/ndjsonStream.ts | 58 +++++++++++++++ 2 files changed, 58 insertions(+), 70 deletions(-) delete mode 100644 src/public/modules/forms/helpers/ndjsonStream.js create mode 100644 src/public/modules/forms/helpers/ndjsonStream.ts diff --git a/src/public/modules/forms/helpers/ndjsonStream.js b/src/public/modules/forms/helpers/ndjsonStream.js deleted file mode 100644 index 4df867a980..0000000000 --- a/src/public/modules/forms/helpers/ndjsonStream.js +++ /dev/null @@ -1,70 +0,0 @@ -/* eslint-disable camelcase */ -'use strict' - -// Modified fork of https://github.com/canjs/can-ndjson-stream to enqueue -// the string immediately without a JSON.parse() step, as the stream payload -// is to be decrypted by the decryption worker. - -// Note that this code assumes a polyfill of TextDecoder is available to run in IE11. - -const ndjsonStream = function (response) { - // For cancellation - var is_reader - var cancellationRequest = false - return new ReadableStream({ - start: function (controller) { - var reader = response.getReader() - is_reader = reader - var decoder = new TextDecoder() - var data_buf = '' - - reader.read().then(function processResult(result) { - if (result.done) { - if (cancellationRequest) { - // Immediately exit - return - } - - data_buf = data_buf.trim() - if (data_buf.length !== 0) { - try { - controller.enqueue(data_buf) - } catch (e) { - controller.error(e) - return - } - } - controller.close() - return - } - - var data = decoder.decode(result.value, { stream: true }) - data_buf += data - var lines = data_buf.split('\n') - for (var i = 0; i < lines.length - 1; ++i) { - var l = lines[i].trim() - if (l.length > 0) { - try { - controller.enqueue(l) - } catch (e) { - controller.error(e) - cancellationRequest = true - reader.cancel() - return - } - } - } - data_buf = lines[lines.length - 1] - - return reader.read().then(processResult) - }) - }, - cancel: function (reason) { - console.log('Cancel registered due to ', reason) - cancellationRequest = true - is_reader.cancel() - }, - }) -} - -module.exports = ndjsonStream diff --git a/src/public/modules/forms/helpers/ndjsonStream.ts b/src/public/modules/forms/helpers/ndjsonStream.ts new file mode 100644 index 0000000000..f7ce671c9e --- /dev/null +++ b/src/public/modules/forms/helpers/ndjsonStream.ts @@ -0,0 +1,58 @@ +// Modified fork of https://github.com/canjs/can-ndjson-stream to enqueue +// the string immediately without a JSON.parse() step, as the stream payload +// is to be decrypted by the decryption worker. + +// Note that this code assumes a polyfill of TextDecoder is available to run in IE11. + +export const ndjsonStream = ( + response: ReadableStream, +): ReadableStream => { + // For cancellation + let maybeReader: ReadableStreamDefaultReader | undefined + let shouldCancel = false + return new ReadableStream({ + start: function (controller) { + const reader = response.getReader() + maybeReader = reader + const decoder = new TextDecoder() + + return reader + .read() + .then(function processResult(result): Promise | undefined | void { + if (shouldCancel) { + return + } + + if (result.done) { + return controller.close() + } + + // Read the input in as a stream and split by newline and trim + const lines = decoder + .decode(result.value, { stream: true }) + .split('\n') + .map((line) => line.trim()) + + // Only append if there is content available + lines.forEach((line) => { + if (line) { + try { + controller.enqueue(line) + } catch (e) { + controller.error(e) + shouldCancel = true + return reader.cancel() + } + } + }) + + return reader.read().then(processResult) + }) + }, + cancel: function (reason) { + console.log('Cancel registered due to ', reason) + shouldCancel = true + maybeReader && maybeReader.cancel() + }, + }) +} From a3c6885183d8d7c53608371774ca69d79dafb37b Mon Sep 17 00:00:00 2001 From: seaerchin Date: Tue, 1 Jun 2021 20:12:17 +0800 Subject: [PATCH 2/7] refactor(submissions.client.factory): updated callsite to use new version of ndjsonstream --- src/public/modules/forms/services/submissions.client.factory.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/public/modules/forms/services/submissions.client.factory.js b/src/public/modules/forms/services/submissions.client.factory.js index a7296b6124..f3a64ba286 100644 --- a/src/public/modules/forms/services/submissions.client.factory.js +++ b/src/public/modules/forms/services/submissions.client.factory.js @@ -3,7 +3,7 @@ const CsvMHGenerator = require('../helpers/CsvMergedHeadersGenerator') const DecryptionWorker = require('../helpers/decryption.worker.js') const { fixParamsToUrl, triggerFileDownload } = require('../helpers/util') -const ndjsonStream = require('../helpers/ndjsonStream') +const { ndjsonStream } = require('../helpers/ndjsonStream') const fetchStream = require('fetch-readablestream') const { decode: decodeBase64 } = require('@stablelib/base64') const JSZip = require('jszip') From 0c8decd0846bcdf2ab6817c1fd8678d038e54c09 Mon Sep 17 00:00:00 2001 From: seaerchin Date: Wed, 2 Jun 2021 14:20:40 +0800 Subject: [PATCH 3/7] revert(ndjsonstream): adds data_buf back to prevent regression --- .../modules/forms/helpers/ndjsonStream.ts | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/public/modules/forms/helpers/ndjsonStream.ts b/src/public/modules/forms/helpers/ndjsonStream.ts index f7ce671c9e..2e672058dd 100644 --- a/src/public/modules/forms/helpers/ndjsonStream.ts +++ b/src/public/modules/forms/helpers/ndjsonStream.ts @@ -15,6 +15,7 @@ export const ndjsonStream = ( const reader = response.getReader() maybeReader = reader const decoder = new TextDecoder() + let data_buf = '' return reader .read() @@ -24,14 +25,22 @@ export const ndjsonStream = ( } if (result.done) { + data_buf = data_buf.trim() + if (data_buf.length !== 0) { + try { + controller.enqueue(data_buf) + } catch (e) { + controller.error(e) + return + } + } return controller.close() } // Read the input in as a stream and split by newline and trim - const lines = decoder - .decode(result.value, { stream: true }) - .split('\n') - .map((line) => line.trim()) + const data = decoder.decode(result.value, { stream: true }) + data_buf += data + const lines = data_buf.split('\n').map((line) => line.trim()) // Only append if there is content available lines.forEach((line) => { @@ -46,6 +55,7 @@ export const ndjsonStream = ( } }) + data_buf = lines[lines.length - 1] return reader.read().then(processResult) }) }, From 796d457e7074ef137428f1d25c77506f243a3da0 Mon Sep 17 00:00:00 2001 From: seaerchin Date: Fri, 4 Jun 2021 02:27:48 +0800 Subject: [PATCH 4/7] fix(ndjsonstream): fixed errorneous initial conditional --- src/public/modules/forms/helpers/ndjsonStream.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/public/modules/forms/helpers/ndjsonStream.ts b/src/public/modules/forms/helpers/ndjsonStream.ts index 2e672058dd..d47d2dc8d3 100644 --- a/src/public/modules/forms/helpers/ndjsonStream.ts +++ b/src/public/modules/forms/helpers/ndjsonStream.ts @@ -20,7 +20,7 @@ export const ndjsonStream = ( return reader .read() .then(function processResult(result): Promise | undefined | void { - if (shouldCancel) { + if (result.done && shouldCancel) { return } @@ -38,8 +38,7 @@ export const ndjsonStream = ( } // Read the input in as a stream and split by newline and trim - const data = decoder.decode(result.value, { stream: true }) - data_buf += data + data_buf += decoder.decode(result.value, { stream: true }) const lines = data_buf.split('\n').map((line) => line.trim()) // Only append if there is content available @@ -56,13 +55,16 @@ export const ndjsonStream = ( }) data_buf = lines[lines.length - 1] + return reader.read().then(processResult) }) }, cancel: function (reason) { console.log('Cancel registered due to ', reason) shouldCancel = true - maybeReader && maybeReader.cancel() + if (maybeReader) { + return maybeReader.cancel() + } }, }) } From b188fde9516b05229ec0cb9b2c7506f119d04154 Mon Sep 17 00:00:00 2001 From: seaerchin Date: Sun, 6 Jun 2021 23:04:43 +0800 Subject: [PATCH 5/7] fix(ndjsontream): fixed insidious bug causing occasional off by 1 --- src/public/modules/forms/helpers/ndjsonStream.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/public/modules/forms/helpers/ndjsonStream.ts b/src/public/modules/forms/helpers/ndjsonStream.ts index d47d2dc8d3..ffc721cf5e 100644 --- a/src/public/modules/forms/helpers/ndjsonStream.ts +++ b/src/public/modules/forms/helpers/ndjsonStream.ts @@ -39,10 +39,12 @@ export const ndjsonStream = ( // Read the input in as a stream and split by newline and trim data_buf += decoder.decode(result.value, { stream: true }) - const lines = data_buf.split('\n').map((line) => line.trim()) + const lines = data_buf.split('\n') + const readableLines = lines.slice(0, -1).map((line) => line.trim()) + const bufferStore = lines[lines.length - 1] // Only append if there is content available - lines.forEach((line) => { + readableLines.forEach((line) => { if (line) { try { controller.enqueue(line) @@ -54,7 +56,7 @@ export const ndjsonStream = ( } }) - data_buf = lines[lines.length - 1] + data_buf = bufferStore return reader.read().then(processResult) }) From 7318a6018f9332869c1d5256d955b8caac0c0f13 Mon Sep 17 00:00:00 2001 From: seaerchin Date: Mon, 7 Jun 2021 17:48:26 +0800 Subject: [PATCH 6/7] revert(ndjsonstream): reverts slice/map to for loops --- .../modules/forms/helpers/ndjsonStream.ts | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/public/modules/forms/helpers/ndjsonStream.ts b/src/public/modules/forms/helpers/ndjsonStream.ts index ffc721cf5e..bc5f775b78 100644 --- a/src/public/modules/forms/helpers/ndjsonStream.ts +++ b/src/public/modules/forms/helpers/ndjsonStream.ts @@ -40,23 +40,22 @@ export const ndjsonStream = ( // Read the input in as a stream and split by newline and trim data_buf += decoder.decode(result.value, { stream: true }) const lines = data_buf.split('\n') - const readableLines = lines.slice(0, -1).map((line) => line.trim()) - const bufferStore = lines[lines.length - 1] - // Only append if there is content available - readableLines.forEach((line) => { - if (line) { + // Reads in every line BUT the last + // Trims the line and queues it in the controller if there is content in the line + for (let i = 0; i < lines.length - 1; ++i) { + const l = lines[i].trim() + if (l.length > 0) { try { - controller.enqueue(line) + controller.enqueue(l) } catch (e) { controller.error(e) shouldCancel = true return reader.cancel() } } - }) - - data_buf = bufferStore + } + data_buf = lines[lines.length - 1] return reader.read().then(processResult) }) From eda29732e69668199c0b3d609e32a276b652b1be Mon Sep 17 00:00:00 2001 From: seaerchin Date: Tue, 8 Jun 2021 11:04:30 +0800 Subject: [PATCH 7/7] revert(ndjsonstream): reverts single line return --- src/public/modules/forms/helpers/ndjsonStream.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/public/modules/forms/helpers/ndjsonStream.ts b/src/public/modules/forms/helpers/ndjsonStream.ts index bc5f775b78..829cb3c91c 100644 --- a/src/public/modules/forms/helpers/ndjsonStream.ts +++ b/src/public/modules/forms/helpers/ndjsonStream.ts @@ -34,7 +34,8 @@ export const ndjsonStream = ( return } } - return controller.close() + controller.close() + return } // Read the input in as a stream and split by newline and trim @@ -51,7 +52,8 @@ export const ndjsonStream = ( } catch (e) { controller.error(e) shouldCancel = true - return reader.cancel() + void reader.cancel() + return } } } @@ -64,7 +66,7 @@ export const ndjsonStream = ( console.log('Cancel registered due to ', reason) shouldCancel = true if (maybeReader) { - return maybeReader.cancel() + void maybeReader.cancel() } }, })