diff --git a/apps/api/src/app/controllers/sf-bulk-api.controller.ts b/apps/api/src/app/controllers/sf-bulk-api.controller.ts index 082c9787c..c566e7c50 100644 --- a/apps/api/src/app/controllers/sf-bulk-api.controller.ts +++ b/apps/api/src/app/controllers/sf-bulk-api.controller.ts @@ -1,12 +1,11 @@ -import { getExceptionLog, logger } from '@jetstream/api-config'; import { BooleanQueryParamSchema, CreateJobRequestSchema } from '@jetstream/api-types'; import { HTTP } from '@jetstream/shared/constants'; -import { ensureBoolean, toBoolean } from '@jetstream/shared/utils'; +import { ensureBoolean, getErrorMessageAndStackObj, toBoolean } from '@jetstream/shared/utils'; import { NODE_STREAM_INPUT, parse as parseCsv } from 'papaparse'; -import { Readable } from 'stream'; +import { PassThrough, Readable, Transform } from 'stream'; import { z } from 'zod'; import { UserFacingError } from '../utils/error-handler'; -import { sendJson } from '../utils/response.handlers'; +import { sendJson, streamParsedCsvAsJson } from '../utils/response.handlers'; import { createRoute } from '../utils/route.utils'; export const routeDefinition = { @@ -41,6 +40,24 @@ export const routeDefinition = { }), }, }, + downloadAllResults: { + controllerFn: () => downloadAllResults, + validators: { + params: z.object({ + jobId: z.string().min(1), + }), + query: z.object({ + /** + * Optional batch ids, if not provided then all batches will be downloaded from job + * this is important because the returned batches array is not stable and the client relies on the order + */ + batchIds: z + .string() + .nullish() + .transform((val) => new Set(val?.split(',') || [])), + }), + }, + }, downloadResultsFile: { controllerFn: () => downloadResultsFile, validators: { @@ -188,7 +205,7 @@ const downloadResultsFile = createRoute( */ const downloadResults = createRoute( routeDefinition.downloadResults.validators, - async ({ params, query, jetstreamConn, requestId }, req, res, next) => { + async ({ params, query, jetstreamConn }, req, res, next) => { try { const jobId = params.jobId; const batchId = params.batchId; @@ -220,32 +237,118 @@ const downloadResults = createRoute( Readable.fromWeb(results as any).pipe(csvParseStream); } - let isFirstChunk = true; + streamParsedCsvAsJson(res, csvParseStream); + } catch (ex) { + next(new UserFacingError(ex)); + } + } +); + +/** + * Download all results from a batch job as JSON, streamed from Salesforce as CSVs, and transformed to JSON on the fly + */ +const downloadAllResults = createRoute( + routeDefinition.downloadAllResults.validators, + async ({ params, jetstreamConn, query, requestId }, req, res, next) => { + const combinedStream = new PassThrough(); + try { + const jobId = params.jobId; + let { batchIds } = query; - csvParseStream.on('data', (data) => { - data = JSON.stringify(data); - if (isFirstChunk) { - isFirstChunk = false; - data = `{"data":[${data}`; - } else { - data = `,${data}`; - } - res.write(data); - }); - csvParseStream.on('finish', () => { - res.write(']}'); - res.end(); - logger.info({ requestId }, 'Finished streaming download from Salesforce'); + const csvParseStream = parseCsv(NODE_STREAM_INPUT, { + delimiter: ',', + header: true, + skipEmptyLines: true, + transform: (data, field) => { + if (field === 'Success' || field === 'Created') { + return toBoolean(data); + } else if (field === 'Id' || field === 'Error') { + return data || null; + } + return data; + }, }); - csvParseStream.on('error', (err) => { - logger.warn({ requestId, ...getExceptionLog(err) }, 'Error streaming files from Salesforce.'); - if (!res.headersSent) { - res.status(400).json({ error: true, message: 'Error streaming files from Salesforce' }); - } else { - res.status(400).end(); + + // Fetch job to get all completed batches + const job = await jetstreamConn.bulk.getJob(jobId); + const batchIdsFromJob = new Set(job.batches.filter((batch) => batch.state === 'Completed').map((batch) => batch.id)); + + // If no batchIds provided, use all completed batches + if (batchIds.size === 0) { + batchIds = batchIdsFromJob; + } + + // Remove any provided batchIds that are not in the job or are not Completed + batchIds.forEach((batchId) => { + if (!batchIdsFromJob.has(batchId)) { + batchIds.delete(batchId); } }); + + if (batchIds.size === 0) { + throw new UserFacingError('No completed batches found in the job'); + } + + // initiate stream response through passthrough stream + streamParsedCsvAsJson(res, csvParseStream); + combinedStream.pipe(csvParseStream); + + let isFirstBatch = true; + + for (const batchId of batchIds) { + try { + const results = await jetstreamConn.bulk.downloadRecords(jobId, batchId, 'result'); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const readable = Readable.fromWeb(results as any); + + let streamToPipe: Readable; + + if (isFirstBatch) { + // First batch, include headers + streamToPipe = readable; + isFirstBatch = false; + } else { + let headerRemoved = false; + // Subsequent batches, remove headers + const removeHeaderTransform = new Transform({ + transform(chunk, encoding, callback) { + // Convert chunk to string + const data = chunk.toString(); + // If header has been removed, pass data through + if (headerRemoved) { + callback(null, chunk); + } else { + // Remove the first line (header) + const index = data.indexOf('\n'); + if (index !== -1) { + headerRemoved = true; + const dataWithoutHeader = data.slice(index + 1); + callback(null, Buffer.from(dataWithoutHeader)); + } else { + // Header line not yet complete + callback(); + } + } + }, + }); + streamToPipe = readable.pipe(removeHeaderTransform); + } + + // pipe all data through passthrough stream + await new Promise((resolve, reject) => { + streamToPipe.pipe(combinedStream, { end: false }); + streamToPipe.on('end', resolve); + streamToPipe.on('error', reject); + }); + } catch (ex) { + res.log.error({ requestId, ...getErrorMessageAndStackObj(ex) }, 'Error downloading batch results'); + } + } + // indicate end of stream - we are done pushing data + combinedStream.end(); } catch (ex) { + // combinedStream.destroy(); + combinedStream.end(); next(new UserFacingError(ex)); } } diff --git a/apps/api/src/app/routes/api.routes.ts b/apps/api/src/app/routes/api.routes.ts index f0401cb1a..040dd911f 100644 --- a/apps/api/src/app/routes/api.routes.ts +++ b/apps/api/src/app/routes/api.routes.ts @@ -139,6 +139,7 @@ routes.get('/bulk/:jobId', bulkApiController.getJob.controllerFn()); routes.delete('/bulk/:jobId/:action', bulkApiController.closeOrAbortJob.controllerFn()); routes.post('/bulk/:jobId', bulkApiController.addBatchToJob.controllerFn()); routes.post('/bulk/zip/:jobId', bulkApiController.addBatchToJobWithBinaryAttachment.controllerFn()); +routes.get('/bulk/download-all/:jobId', bulkApiController.downloadAllResults.controllerFn()); routes.get('/bulk/:jobId/:batchId', bulkApiController.downloadResults.controllerFn()); /** diff --git a/apps/api/src/app/utils/response.handlers.ts b/apps/api/src/app/utils/response.handlers.ts index 1b4770a8e..0db792fe3 100644 --- a/apps/api/src/app/utils/response.handlers.ts +++ b/apps/api/src/app/utils/response.handlers.ts @@ -5,6 +5,7 @@ import { Maybe } from '@jetstream/types'; import { SalesforceOrg } from '@prisma/client'; import { serialize } from 'cookie'; import * as express from 'express'; +import { Duplex } from 'stream'; import * as salesforceOrgsDb from '../db/salesforce-org.db'; import { Response } from '../types/types'; import { AuthenticationError, NotFoundError, UserFacingError } from './error-handler'; @@ -85,6 +86,39 @@ export function sendJson(res: Response, content?: Respon return res.json({ data: content || {} }); } +/** + * Given a CSV parse stream, stream as JSON to the client + */ +export function streamParsedCsvAsJson(res: express.Response, csvParseStream: Duplex) { + let isFirstChunk = true; + + csvParseStream.on('data', (data) => { + data = JSON.stringify(data); + if (isFirstChunk) { + isFirstChunk = false; + data = `{"data":[${data}`; + } else { + data = `,${data}`; + } + res.write(data); + }); + + csvParseStream.on('finish', () => { + res.write(']}'); + res.end(); + res.log.info({ requestId: res.locals.requestId }, 'Finished streaming CSV'); + }); + + csvParseStream.on('error', (err) => { + res.log.warn({ requestId: res.locals.requestId, ...getExceptionLog(err) }, 'Error streaming CSV.'); + if (!res.headersSent) { + res.status(400).json({ error: true, message: 'Error streaming CSV' }); + } else { + res.status(400).end(); + } + }); +} + export function blockBotHandler(req: express.Request, res: express.Response) { res.log.debug('[BLOCKED REQUEST]'); res.status(403).send('Forbidden'); diff --git a/apps/docs/docs/load/load.mdx b/apps/docs/docs/load/load.mdx index 02dd239d5..f6732abe6 100644 --- a/apps/docs/docs/load/load.mdx +++ b/apps/docs/docs/load/load.mdx @@ -111,11 +111,11 @@ There are some final options available before loading your data to Salesforce. ### API mode - **Bulk API** - Best for very large data loads - - This will use the Salesforce Bulk API 1.0 to load your data. + - This will use the [Salesforce Bulk API 1.0](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_how_bulk_api_works.htm) to load your data. - This will take longer and is optimized for high volume. - Some types of automation may not be triggered when using the Bulk API. - **Batch API** - Best for small to medium size loads - - This uses the Batch API and is generally much faster than the Bulk API. + - This uses the [sObject Collection API](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_sobjects_collections.htm) and is generally much faster than the Bulk API. :::tip diff --git a/libs/features/load-records/src/components/load-results/LoadRecordsBulkApiResults.tsx b/libs/features/load-records/src/components/load-results/LoadRecordsBulkApiResults.tsx index d5fc05887..43ffe8b4c 100644 --- a/libs/features/load-records/src/components/load-results/LoadRecordsBulkApiResults.tsx +++ b/libs/features/load-records/src/components/load-results/LoadRecordsBulkApiResults.tsx @@ -1,7 +1,7 @@ import { css } from '@emotion/react'; import { logger } from '@jetstream/shared/client-logger'; import { ANALYTICS_KEYS } from '@jetstream/shared/constants'; -import { bulkApiAbortJob, bulkApiGetJob, bulkApiGetRecords } from '@jetstream/shared/data'; +import { bulkApiAbortJob, bulkApiGetJob, bulkApiGetRecords, bulkApiGetRecordsFromAllBatches } from '@jetstream/shared/data'; import { checkIfBulkApiJobIsDone, convertDateToLocale, useBrowserNotifications, useRollbar } from '@jetstream/shared/ui-utils'; import { decodeHtmlEntity, @@ -9,6 +9,7 @@ import { getErrorMessageAndStackObj, getSuccessOrFailureChar, pluralizeFromNumber, + splitArrayToMaxSize, } from '@jetstream/shared/utils'; import { ApiMode, @@ -17,6 +18,7 @@ import { BulkJobWithBatches, DownloadAction, DownloadModalData, + DownloadScope, DownloadType, FieldMapping, InsertUpdateUpsertDelete, @@ -28,7 +30,17 @@ import { SalesforceOrgUi, ViewModalData, } from '@jetstream/types'; -import { FileDownloadModal, Grid, ProgressRing, SalesforceLogin, Spinner, Tooltip, fireToast } from '@jetstream/ui'; +import { + ButtonGroupContainer, + FileDownloadModal, + Grid, + Icon, + ProgressRing, + SalesforceLogin, + Spinner, + Tooltip, + fireToast, +} from '@jetstream/ui'; import { LoadRecordsBulkApiResultsTable, LoadRecordsResultsModal, @@ -122,6 +134,7 @@ export const LoadRecordsBulkApiResults: FunctionComponent({ open: false, data: [], header: [], type: 'results' }); + const [downloadState, setDownloadState] = useState(null); const { notifyUser } = useBrowserNotifications(serverUrl); useEffect(() => { @@ -363,12 +376,27 @@ export const LoadRecordsBulkApiResults: FunctionComponent item.completed).length + 1} of ${batchSummary.totalBatches}`; } - async function handleDownloadOrViewRecords( - action: DownloadAction, - type: DownloadType, - batch: BulkJobBatchInfo, - batchIndex: number - ): Promise { + async function handleDownloadOrViewRecords({ + scope, + action, + type, + batch, + batchIndex, + }: + | { + scope: 'all'; + action: DownloadAction; + type: 'results'; + batch?: never; + batchIndex?: never; + } + | { + scope: 'batch'; + action: DownloadAction; + type: DownloadType; + batch: BulkJobBatchInfo; + batchIndex: number; + }): Promise { try { if (!batchSummary || !jobInfo?.id || !preparedData) { return; @@ -376,15 +404,43 @@ export const LoadRecordsBulkApiResults: FunctionComponent(selectedOrg, jobInfo.id, batch.id, 'result'); - // this should match, but will fallback to batchIndex if for some reason we cannot find the batch - const batchSummaryItem = batchSummary.batchSummary.find((item) => item.id === batch.id); - const startIdx = (batchSummaryItem?.batchNumber ?? batchIndex) * batchSize; - /** For delete, only records with a mapped Id will be included in response from SFDC */ - const records: any[] = preparedData.data - .slice(startIdx, startIdx + batchSize) - .filter((record) => (loadType !== 'DELETE' ? true : !!record.Id)); + + setDownloadState(scope); + + let results: BulkJobResultRecord[]; + let records: any[] = preparedData.data; + let removedBatches = false; + + if (scope === 'all') { + // Download results across all batches + const splitRecordsByBatchSize = splitArrayToMaxSize(records, batchSize); + jobInfo.batches.forEach((batch, i) => { + if (batch.state !== 'Completed') { + // remove batch from result records + splitRecordsByBatchSize.splice(i, 1); + removedBatches = true; + } + }); + records = splitRecordsByBatchSize.flat(); + const batchIds = jobInfo.batches.filter((batch) => batch.state === 'Completed').map((batch) => batch.id); + // download records, combine results from salesforce with actual records, open download modal + results = await bulkApiGetRecordsFromAllBatches(selectedOrg, jobInfo.id, batchIds); + /** For delete, only records with a mapped Id will be included in response from SFDC */ + records = preparedData.data.filter((record) => (loadType !== 'DELETE' ? true : !!record.Id)); + } else { + // Download results for a single batch + // download records, combine results from salesforce with actual records, open download modal + results = await bulkApiGetRecords(selectedOrg, jobInfo.id, batch.id, 'result'); + // this should match, but will fallback to batchIndex if for some reason we cannot find the batch + const batchSummaryItem = batchSummary.batchSummary.find((item) => item.id === batch.id); + const startIdx = (batchSummaryItem?.batchNumber ?? batchIndex) * batchSize; + /** + * Get records from this one batch + * For delete, only records with a mapped Id will be included in response from SFDC + */ + records = preparedData.data.slice(startIdx, startIdx + batchSize).filter((record) => (loadType !== 'DELETE' ? true : !!record.Id)); + } + const combinedResults: BulkJobResultRecord[] = []; results.forEach((resultRecord, i) => { @@ -398,11 +454,11 @@ export const LoadRecordsBulkApiResults: FunctionComponent
+ {batchSummary && status === STATUSES.FINISHED && batchSummary.totalBatches > 1 && ( +
+ {downloadState === 'all' && } + + + + +
+ )} {ABORTABLE_STATUSES.has(status) && ( - -
+ +
+ {Array.isArray(rows) && ( + <> + {formatNumber(rows.length)} {pluralizeFromNumber('Record', rows.length)} + + )} +
+
+ + +
+
} onClose={onClose} > diff --git a/libs/types/src/lib/ui/load-records-results-types.ts b/libs/types/src/lib/ui/load-records-results-types.ts index 85662876c..25fafbf7e 100644 --- a/libs/types/src/lib/ui/load-records-results-types.ts +++ b/libs/types/src/lib/ui/load-records-results-types.ts @@ -1,3 +1,4 @@ +export type DownloadScope = 'all' | 'batch'; export type DownloadAction = 'view' | 'download'; export type DownloadType = 'results' | 'failures';