From fedc0d317845596cc1bdffc27376df998935191b Mon Sep 17 00:00:00 2001 From: Anthony Heber Date: Wed, 30 Mar 2022 15:08:23 -0600 Subject: [PATCH] fix: bulk batching wide fIles (cli/#1460) --- package.json | 1 + src/batcher.ts | 20 ++++++++++++++++++-- test/commands/batcher.test.ts | 34 ++++++++++++++++++++++++++++++++++ yarn.lock | 5 +++++ 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 41cc46e4f..7c74106bb 100644 --- a/package.json +++ b/package.json @@ -89,6 +89,7 @@ "@salesforce/ts-types": "^1.5.20", "chalk": "^4.1.0", "csv-parse": "^4.16.3", + "csv-stringify": "^6.0.5", "tslib": "^2" }, "devDependencies": { diff --git a/src/batcher.ts b/src/batcher.ts index b964903a9..9a5c22a05 100644 --- a/src/batcher.ts +++ b/src/batcher.ts @@ -10,8 +10,12 @@ import parse = require('csv-parse'); import { Batch, BatchInfo, BatchResultInfo, JobInfo } from 'jsforce'; import { UX } from '@salesforce/command'; import { Job } from 'jsforce/job'; +import { stringify } from 'csv-stringify/sync'; +// max rows per file in Bulk 1.0 const BATCH_RECORDS_LIMIT = 10000; +/// max characters/bytes per file in Bulk 1.0 +const BATCH_BYTES_LIMIT = 10000000; const POLL_FREQUENCY_MS = 5000; Messages.importMessagesDirectory(__dirname); @@ -295,6 +299,8 @@ export class Batcher { // split all records into batches const batches: Batches = []; let batchIndex = 0; + let batchBytes = 0; + let batchHeaderBytes = 0; batches[batchIndex] = []; return await new Promise((resolve, reject) => { @@ -309,14 +315,24 @@ export class Batcher { readStream.pipe(parser); parser.on('data', (element: BatchEntry) => { - batches[batchIndex].push(element); - if (batches[batchIndex].length === BATCH_RECORDS_LIMIT) { + if (!batchHeaderBytes) { + // capture header byte length + batchHeaderBytes = Buffer.byteLength(stringify([Object.keys(element)]) + '\n', 'utf8'); + batchBytes = batchHeaderBytes; + } + // capture row byte length + const rowBytes = Buffer.byteLength(stringify([Object.values(element)]) + '\n', 'utf8'); + if (batches[batchIndex].length === BATCH_RECORDS_LIMIT || rowBytes + batchBytes > BATCH_BYTES_LIMIT) { // TODO: we can start processing this batch here // we need event listeners to remove all of the `await new Promise` // next batch batchIndex++; batches[batchIndex] = []; + // reset file size to just the headers + batchBytes = batchHeaderBytes; } + batchBytes += rowBytes; + batches[batchIndex].push(element); }); parser.on('error', (err) => { diff --git a/test/commands/batcher.test.ts b/test/commands/batcher.test.ts index 126a4d348..65d4dafb5 100644 --- a/test/commands/batcher.test.ts +++ b/test/commands/batcher.test.ts @@ -107,6 +107,40 @@ describe('batcher', () => { expect(exitSpy.notCalled).to.equal(true); }); + it('Should not create another batch for exactly 10000 records', async () => { + inStream.push(`name , field1, field2${os.EOL}`); + for (let i = 0; i < 10000; i++) { + inStream.push(`obj1,val1,val2${os.EOL}`); + } + inStream.push(null); + // @ts-ignore private method + const batches = batcher.splitIntoBatches(inStream); + const result = await batches; + expect(result.length).to.equal(1); + expect(result[0].length).to.equal(10000); + expect(exitSpy.notCalled).to.equal(true); + }); + + it('Should create another batch after 10MB of data', async () => { + // generate a large string for use as a field value + let bigField = ''; + while (bigField.length <= 2000) { + bigField += 'l'; + } + inStream.push(`"n""am""e",field1,field2${os.EOL}`); + for (let i = 0; i < 5000; i++) { + inStream.push(`obj1,"v""al""1",${bigField}${os.EOL}`); + } + inStream.push(null); + // @ts-ignore private method + const batches = batcher.splitIntoBatches(inStream); + const result = await batches; + expect(result.length).to.equal(2); + expect(result[0].length).to.equal(4952); + expect(result[1].length).to.equal(48); + expect(exitSpy.notCalled).to.equal(true); + }); + it('should be able to read through line breaks within fields', async () => { inStream.push(`name,field1,field2${os.EOL}`); inStream.push(`obj1,"val1\n\nval1","\nval2"${os.EOL}`); diff --git a/yarn.lock b/yarn.lock index 97fcf89c8..3e5707736 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1911,6 +1911,11 @@ csv-stringify@^1.0.4: dependencies: lodash.get "~4.4.2" +csv-stringify@^6.0.5: + version "6.0.5" + resolved "https://registry.yarnpkg.com/csv-stringify/-/csv-stringify-6.0.5.tgz#3474a4fe784249eb5c91d455f616e1f70961cdc0" + integrity sha512-7xpV3uweJCFF/Ssn56l3xsR/k2r3UqszwjEhej9qEn2cCPzyK1WyHCgoUVzBA792x8HbwonNX7CU9XM2K5s5yw== + cz-conventional-changelog@3.2.0: version "3.2.0" resolved "https://registry.npmjs.org/cz-conventional-changelog/-/cz-conventional-changelog-3.2.0.tgz#6aef1f892d64113343d7e455529089ac9f20e477"