Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bulk batching wide fIles (cli/#1460) #285

Merged
merged 3 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"@salesforce/ts-types": "^1.5.20",
"chalk": "^4.1.0",
"csv-parse": "^4.16.3",
"csv-stringify": "^6.0.5",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jsyk, we need to get approval for all 3rd party libs. This one looks like it shouldn't be a problem.

"tslib": "^2"
},
"devDependencies": {
Expand Down
20 changes: 18 additions & 2 deletions src/batcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import parse = require('csv-parse');
import { Batch, BatchInfo, BatchResultInfo, JobInfo } from 'jsforce';
import { UX } from '@salesforce/command';
import { Job } from 'jsforce/job';
import { stringify } from 'csv-stringify/sync';

// max rows per file in Bulk 1.0
const BATCH_RECORDS_LIMIT = 10000;
/// max characters/bytes per file in Bulk 1.0
const BATCH_BYTES_LIMIT = 10000000;
const POLL_FREQUENCY_MS = 5000;

Messages.importMessagesDirectory(__dirname);
Expand Down Expand Up @@ -295,6 +299,8 @@ export class Batcher {
// split all records into batches
const batches: Batches = [];
let batchIndex = 0;
let batchBytes = 0;
let batchHeaderBytes = 0;
batches[batchIndex] = [];

return await new Promise((resolve, reject) => {
Expand All @@ -309,14 +315,24 @@ export class Batcher {
readStream.pipe(parser);

parser.on('data', (element: BatchEntry) => {
batches[batchIndex].push(element);
if (batches[batchIndex].length === BATCH_RECORDS_LIMIT) {
if (!batchHeaderBytes) {
// capture header byte length
batchHeaderBytes = Buffer.byteLength(stringify([Object.keys(element)]) + '\n', 'utf8');
batchBytes = batchHeaderBytes;
}
// capture row byte length
const rowBytes = Buffer.byteLength(stringify([Object.values(element)]) + '\n', 'utf8');
if (batches[batchIndex].length === BATCH_RECORDS_LIMIT || rowBytes + batchBytes > BATCH_BYTES_LIMIT) {
// TODO: we can start processing this batch here
// we need event listeners to remove all of the `await new Promise`
// next batch
batchIndex++;
batches[batchIndex] = [];
// reset file size to just the headers
batchBytes = batchHeaderBytes;
}
batchBytes += rowBytes;
batches[batchIndex].push(element);
});

parser.on('error', (err) => {
Expand Down
34 changes: 34 additions & 0 deletions test/commands/batcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,40 @@ describe('batcher', () => {
expect(exitSpy.notCalled).to.equal(true);
});

it('Should not create another batch for exactly 10000 records', async () => {
inStream.push(`name , field1, field2${os.EOL}`);
for (let i = 0; i < 10000; i++) {
inStream.push(`obj1,val1,val2${os.EOL}`);
}
inStream.push(null);
// @ts-ignore private method
const batches = batcher.splitIntoBatches(inStream);
const result = await batches;
expect(result.length).to.equal(1);
expect(result[0].length).to.equal(10000);
expect(exitSpy.notCalled).to.equal(true);
});

it('Should create another batch after 10MB of data', async () => {
// generate a large string for use as a field value
let bigField = '';
while (bigField.length <= 2000) {
bigField += 'l';
}
inStream.push(`"n""am""e",field1,field2${os.EOL}`);
for (let i = 0; i < 5000; i++) {
inStream.push(`obj1,"v""al""1",${bigField}${os.EOL}`);
}
inStream.push(null);
// @ts-ignore private method
const batches = batcher.splitIntoBatches(inStream);
const result = await batches;
expect(result.length).to.equal(2);
expect(result[0].length).to.equal(4952);
expect(result[1].length).to.equal(48);
expect(exitSpy.notCalled).to.equal(true);
});

it('should be able to read through line breaks within fields', async () => {
inStream.push(`name,field1,field2${os.EOL}`);
inStream.push(`obj1,"val1\n\nval1","\nval2"${os.EOL}`);
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,11 @@ csv-stringify@^1.0.4:
dependencies:
lodash.get "~4.4.2"

csv-stringify@^6.0.5:
version "6.0.5"
resolved "https://registry.yarnpkg.com/csv-stringify/-/csv-stringify-6.0.5.tgz#3474a4fe784249eb5c91d455f616e1f70961cdc0"
integrity sha512-7xpV3uweJCFF/Ssn56l3xsR/k2r3UqszwjEhej9qEn2cCPzyK1WyHCgoUVzBA792x8HbwonNX7CU9XM2K5s5yw==

[email protected]:
version "3.2.0"
resolved "https://registry.npmjs.org/cz-conventional-changelog/-/cz-conventional-changelog-3.2.0.tgz#6aef1f892d64113343d7e455529089ac9f20e477"
Expand Down