Skip to content

Commit

Permalink
Allow viewing/downloading results across all batches
Browse files Browse the repository at this point in the history
Allow user to stream all batch results from Salesforce in a single request to view combined results instead of having to view/download individual results, which is annoying when there are a lot of batches

resolves #1082
  • Loading branch information
paustint committed Nov 23, 2024
1 parent 9b236dc commit 3730be6
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 67 deletions.
155 changes: 129 additions & 26 deletions apps/api/src/app/controllers/sf-bulk-api.controller.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { getExceptionLog, logger } from '@jetstream/api-config';
import { BooleanQueryParamSchema, CreateJobRequestSchema } from '@jetstream/api-types';
import { HTTP } from '@jetstream/shared/constants';
import { ensureBoolean, toBoolean } from '@jetstream/shared/utils';
import { ensureBoolean, getErrorMessageAndStackObj, toBoolean } from '@jetstream/shared/utils';
import { NODE_STREAM_INPUT, parse as parseCsv } from 'papaparse';
import { Readable } from 'stream';
import { PassThrough, Readable, Transform } from 'stream';
import { z } from 'zod';
import { UserFacingError } from '../utils/error-handler';
import { sendJson } from '../utils/response.handlers';
import { sendJson, streamParsedCsvAsJson } from '../utils/response.handlers';
import { createRoute } from '../utils/route.utils';

export const routeDefinition = {
Expand Down Expand Up @@ -41,6 +40,24 @@ export const routeDefinition = {
}),
},
},
downloadAllResults: {
controllerFn: () => downloadAllResults,
validators: {
params: z.object({
jobId: z.string().min(1),
}),
query: z.object({
/**
* Optional batch ids, if not provided then all batches will be downloaded from job
* this is important because the returned batches array is not stable and the client relies on the order
*/
batchIds: z
.string()
.nullish()
.transform((val) => new Set(val?.split(',') || [])),
}),
},
},
downloadResultsFile: {
controllerFn: () => downloadResultsFile,
validators: {
Expand Down Expand Up @@ -188,7 +205,7 @@ const downloadResultsFile = createRoute(
*/
const downloadResults = createRoute(
routeDefinition.downloadResults.validators,
async ({ params, query, jetstreamConn, requestId }, req, res, next) => {
async ({ params, query, jetstreamConn }, req, res, next) => {
try {
const jobId = params.jobId;
const batchId = params.batchId;
Expand Down Expand Up @@ -220,32 +237,118 @@ const downloadResults = createRoute(
Readable.fromWeb(results as any).pipe(csvParseStream);
}

let isFirstChunk = true;
streamParsedCsvAsJson(res, csvParseStream);
} catch (ex) {
next(new UserFacingError(ex));
}
}
);

/**
* Download all results from a batch job as JSON, streamed from Salesforce as CSVs, and transformed to JSON on the fly
*/
const downloadAllResults = createRoute(
routeDefinition.downloadAllResults.validators,
async ({ params, jetstreamConn, query, requestId }, req, res, next) => {
const combinedStream = new PassThrough();
try {
const jobId = params.jobId;
let { batchIds } = query;

csvParseStream.on('data', (data) => {
data = JSON.stringify(data);
if (isFirstChunk) {
isFirstChunk = false;
data = `{"data":[${data}`;
} else {
data = `,${data}`;
}
res.write(data);
});
csvParseStream.on('finish', () => {
res.write(']}');
res.end();
logger.info({ requestId }, 'Finished streaming download from Salesforce');
const csvParseStream = parseCsv(NODE_STREAM_INPUT, {
delimiter: ',',
header: true,
skipEmptyLines: true,
transform: (data, field) => {
if (field === 'Success' || field === 'Created') {
return toBoolean(data);
} else if (field === 'Id' || field === 'Error') {
return data || null;
}
return data;
},
});
csvParseStream.on('error', (err) => {
logger.warn({ requestId, ...getExceptionLog(err) }, 'Error streaming files from Salesforce.');
if (!res.headersSent) {
res.status(400).json({ error: true, message: 'Error streaming files from Salesforce' });
} else {
res.status(400).end();

// Fetch job to get all completed batches
const job = await jetstreamConn.bulk.getJob(jobId);
const batchIdsFromJob = new Set(job.batches.filter((batch) => batch.state === 'Completed').map((batch) => batch.id));

// If no batchIds provided, use all completed batches
if (batchIds.size === 0) {
batchIds = batchIdsFromJob;
}

// Remove any provided batchIds that are not in the job or are not Completed
batchIds.forEach((batchId) => {
if (!batchIdsFromJob.has(batchId)) {
batchIds.delete(batchId);
}
});

if (batchIds.size === 0) {
throw new UserFacingError('No completed batches found in the job');
}

// initiate stream response through passthrough stream
streamParsedCsvAsJson(res, csvParseStream);
combinedStream.pipe(csvParseStream);

let isFirstBatch = true;

for (const batchId of batchIds) {
try {
const results = await jetstreamConn.bulk.downloadRecords(jobId, batchId, 'result');
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const readable = Readable.fromWeb(results as any);

let streamToPipe: Readable;

if (isFirstBatch) {
// First batch, include headers
streamToPipe = readable;
isFirstBatch = false;
} else {
let headerRemoved = false;
// Subsequent batches, remove headers
const removeHeaderTransform = new Transform({
transform(chunk, encoding, callback) {
// Convert chunk to string
const data = chunk.toString();
// If header has been removed, pass data through
if (headerRemoved) {
callback(null, chunk);
} else {
// Remove the first line (header)
const index = data.indexOf('\n');
if (index !== -1) {
headerRemoved = true;
const dataWithoutHeader = data.slice(index + 1);
callback(null, Buffer.from(dataWithoutHeader));
} else {
// Header line not yet complete
callback();
}
}
},
});
streamToPipe = readable.pipe(removeHeaderTransform);
}

// pipe all data through passthrough stream
await new Promise((resolve, reject) => {
streamToPipe.pipe(combinedStream, { end: false });
streamToPipe.on('end', resolve);
streamToPipe.on('error', reject);
});
} catch (ex) {
res.log.error({ requestId, ...getErrorMessageAndStackObj(ex) }, 'Error downloading batch results');
}
}
// indicate end of stream - we are done pushing data
combinedStream.end();
} catch (ex) {
// combinedStream.destroy();
combinedStream.end();
next(new UserFacingError(ex));
}
}
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/app/routes/api.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ routes.get('/bulk/:jobId', bulkApiController.getJob.controllerFn());
routes.delete('/bulk/:jobId/:action', bulkApiController.closeOrAbortJob.controllerFn());
routes.post('/bulk/:jobId', bulkApiController.addBatchToJob.controllerFn());
routes.post('/bulk/zip/:jobId', bulkApiController.addBatchToJobWithBinaryAttachment.controllerFn());
routes.get('/bulk/download-all/:jobId', bulkApiController.downloadAllResults.controllerFn());
routes.get('/bulk/:jobId/:batchId', bulkApiController.downloadResults.controllerFn());

/**
Expand Down
34 changes: 34 additions & 0 deletions apps/api/src/app/utils/response.handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Maybe } from '@jetstream/types';
import { SalesforceOrg } from '@prisma/client';
import { serialize } from 'cookie';
import * as express from 'express';
import { Duplex } from 'stream';
import * as salesforceOrgsDb from '../db/salesforce-org.db';
import { Response } from '../types/types';
import { AuthenticationError, NotFoundError, UserFacingError } from './error-handler';
Expand Down Expand Up @@ -85,6 +86,39 @@ export function sendJson<ResponseType = unknown>(res: Response, content?: Respon
return res.json({ data: content || {} });
}

/**
* Given a CSV parse stream, stream as JSON to the client
*/
export function streamParsedCsvAsJson(res: express.Response, csvParseStream: Duplex) {
let isFirstChunk = true;

csvParseStream.on('data', (data) => {
data = JSON.stringify(data);
if (isFirstChunk) {
isFirstChunk = false;
data = `{"data":[${data}`;
} else {
data = `,${data}`;
}
res.write(data);
});

csvParseStream.on('finish', () => {
res.write(']}');
res.end();
res.log.info({ requestId: res.locals.requestId }, 'Finished streaming CSV');
});

csvParseStream.on('error', (err) => {
res.log.warn({ requestId: res.locals.requestId, ...getExceptionLog(err) }, 'Error streaming CSV.');
if (!res.headersSent) {
res.status(400).json({ error: true, message: 'Error streaming CSV' });
} else {
res.status(400).end();
}
});
}

export function blockBotHandler(req: express.Request, res: express.Response) {
res.log.debug('[BLOCKED REQUEST]');
res.status(403).send('Forbidden');
Expand Down
4 changes: 2 additions & 2 deletions apps/docs/docs/load/load.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ There are some final options available before loading your data to Salesforce.
### API mode

- **Bulk API** - Best for very large data loads
- This will use the Salesforce Bulk API 1.0 to load your data.
- This will use the [Salesforce Bulk API 1.0](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_how_bulk_api_works.htm) to load your data.
- This will take longer and is optimized for high volume.
- Some types of automation may not be triggered when using the Bulk API.
- **Batch API** - Best for small to medium size loads
- This uses the Batch API and is generally much faster than the Bulk API.
- This uses the [sObject Collection API](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections.htm) and is generally much faster than the Bulk API.

:::tip

Expand Down
Loading

0 comments on commit 3730be6

Please sign in to comment.